This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 60c9921c2 refactor: use basicRes everywhere except the `service` 
module (#3943)
60c9921c2 is described below

commit 60c9921c2c4cb832782a7a2d590326b19f297ac6
Author: Klesh Wong <[email protected]>
AuthorDate: Fri Dec 16 09:58:13 2022 +0800

    refactor: use basicRes everywhere except the `service` module (#3943)
    
    * docs: update comment to reflect latest status
    
    * refactor: use basicRes everywhere except service
    
    * fix: linting and tests
    
    * fix: task logging files were miss placed
    
    * fix: tasks should be ordered in run_pipeline
---
 .gitignore                                       |   3 +-
 Makefile                                         |   7 +-
 helpers/e2ehelper/data_flow_tester.go            |   2 +-
 helpers/e2ehelper/data_flow_tester_test.go       |  21 +---
 impl/default_basic_res.go                        |  18 +++
 models/task.go                                   |  14 +++
 plugins/ae/api/init.go                           |   6 +-
 plugins/ae/impl/impl.go                          |   6 +-
 plugins/azure/api/init.go                        |   6 +-
 plugins/azure/impl/impl.go                       |   6 +-
 plugins/bitbucket/api/init.go                    |   6 +-
 plugins/bitbucket/impl/impl.go                   |   6 +-
 plugins/core/basic_res.go                        |   2 +
 plugins/core/dal/dal.go                          |   4 +
 plugins/core/plugin_init.go                      |   4 +-
 plugins/customize/impl/impl.go                   |   6 +-
 plugins/dora/api/data.go                         |  64 ----------
 plugins/dora/impl/impl.go                        |   8 +-
 plugins/feishu/api/init.go                       |   6 +-
 plugins/feishu/impl/impl.go                      |  35 +-----
 plugins/gitee/api/init.go                        |   6 +-
 plugins/gitee/impl/impl.go                       |   6 +-
 plugins/gitextractor/main.go                     |  29 +++--
 plugins/gitextractor/parser/repo.go              |   1 +
 plugins/github/api/init.go                       |   6 +-
 plugins/github/impl/impl.go                      |  10 +-
 plugins/github_graphql/impl/impl.go              |  10 +-
 plugins/gitlab/api/blueprint.go                  |   2 +-
 plugins/gitlab/api/blueprint_V200_test.go        |   4 +-
 plugins/gitlab/api/blueprint_v200.go             |   4 +-
 plugins/gitlab/api/connection.go                 |   2 +-
 plugins/gitlab/api/init.go                       |  10 +-
 plugins/gitlab/api/proxy.go                      |   2 +-
 plugins/gitlab/api/scope.go                      |  14 +--
 plugins/gitlab/api/transformation_rule.go        |  10 +-
 plugins/gitlab/impl/impl.go                      |   7 +-
 plugins/helper/default_task_context.go           |  93 +++-----------
 plugins/icla/impl/impl.go                        |   4 +-
 plugins/jenkins/api/init.go                      |   6 +-
 plugins/jenkins/impl/impl.go                     |   9 +-
 plugins/jira/api/init.go                         |   6 +-
 plugins/jira/impl/impl.go                        |   9 +-
 plugins/org/api/handlers.go                      |   8 +-
 plugins/org/impl/impl.go                         |   8 +-
 plugins/pagerduty/api/init.go                    |   6 +-
 plugins/pagerduty/impl/impl.go                   |   9 +-
 plugins/refdiff/impl/impl.go                     |   7 --
 plugins/tapd/api/init.go                         |  10 +-
 plugins/tapd/impl/impl.go                        |   6 +-
 plugins/webhook/api/init.go                      |   6 +-
 plugins/webhook/impl/impl.go                     |   6 +-
 plugins/zentao/api/init.go                       |   6 +-
 plugins/zentao/impl/impl.go                      |   7 +-
 plugins/azure/api/init.go => runner/basic_res.go |  32 +++--
 runner/directrun.go                              |  40 +++---
 runner/loader.go                                 |   9 +-
 runner/run_pipeline.go                           |  45 +++----
 runner/run_task.go                               | 149 ++++++++++-------------
 scripts/compile-plugins.sh                       |   8 +-
 services/init.go                                 |  50 ++++----
 services/pipeline_runner.go                      |   4 +-
 services/task.go                                 |   6 +-
 worker/app/pipeline_workflow.go                  |   7 +-
 worker/app/shared.go                             |  17 +--
 worker/app/task_activity.go                      |   8 +-
 worker/main.go                                   |  17 +--
 66 files changed, 363 insertions(+), 588 deletions(-)

diff --git a/.gitignore b/.gitignore
index dda6d7670..452d5aa1a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -110,6 +110,7 @@ dist
 
 # Stores VSCode versions used for testing VSCode extensions
 .vscode-test
+_debug_bin
 
 # yarn v2
 .yarn/cache
@@ -157,4 +158,4 @@ mocks/
 api/docs/swagger.json
 api/docs/swagger.yaml
 api/docs/docs.go
-*.result/
+*.result/
\ No newline at end of file
diff --git a/Makefile b/Makefile
index e818a7349..9df8fb0e0 100644
--- a/Makefile
+++ b/Makefile
@@ -22,12 +22,17 @@ TAG ?= $(shell git tag --points-at HEAD)
 IMAGE_REPO ?= "apache"
 VERSION = $(TAG)@$(SHA)
 
-dep:
+go-dep:
        go install github.com/vektra/mockery/v2@latest
        go install github.com/swaggo/swag/cmd/[email protected]
        go install github.com/atombender/go-jsonschema/cmd/gojsonschema@latest
+       go install github.com/golangci/golangci-lint/cmd/[email protected]
+
+python-dep:
        pip install -r requirements.txt
 
+dep: go-dep python-dep
+
 swag:
        swag init --parseDependency --parseInternal -o ./api/docs -g 
./api/api.go -g plugins/*/api/*.go
        @echo "visit the swagger document on 
http://localhost:8080/swagger/index.html";
diff --git a/helpers/e2ehelper/data_flow_tester.go 
b/helpers/e2ehelper/data_flow_tester.go
index f2c32d536..eb8c07392 100644
--- a/helpers/e2ehelper/data_flow_tester.go
+++ b/helpers/e2ehelper/data_flow_tester.go
@@ -191,7 +191,7 @@ func (t *DataFlowTester) Subtask(subtaskMeta 
core.SubTaskMeta, taskData interfac
 
 // SubtaskContext creates a subtask context
 func (t *DataFlowTester) SubtaskContext(taskData interface{}) 
core.SubTaskContext {
-       return helper.NewStandaloneSubTaskContext(context.Background(), t.Cfg, 
t.Log, t.Db, t.Name, taskData)
+       return helper.NewStandaloneSubTaskContext(context.Background(), 
runner.CreateBasicRes(t.Cfg, t.Log, t.Db), t.Name, taskData)
 }
 
 func filterColumn(column dal.ColumnMeta, opts TableOptions) bool {
diff --git a/helpers/e2ehelper/data_flow_tester_test.go 
b/helpers/e2ehelper/data_flow_tester_test.go
index c13d99459..77bd1649d 100644
--- a/helpers/e2ehelper/data_flow_tester_test.go
+++ b/helpers/e2ehelper/data_flow_tester_test.go
@@ -53,29 +53,16 @@ func ExampleDataFlowTester() {
        }
 
        // import raw data table
-       
dataflowTester.ImportCsvIntoRawTable("./tables/_raw_gitlab_api_projects.csv", 
"_raw_gitlab_api_project")
+       
dataflowTester.ImportCsvIntoRawTable("./tables/_raw_gitlab_api_issues.csv", 
"_raw_gitlab_api_issues")
 
        // verify extraction
        dataflowTester.FlushTabler(gitlabModels.GitlabProject{})
-       dataflowTester.Subtask(tasks.ExtractProjectMeta, taskData)
+       dataflowTester.Subtask(tasks.ExtractApiIssuesMeta, taskData)
        dataflowTester.VerifyTable(
-               gitlabModels.GitlabProject{},
-               "tables/_tool_gitlab_projects.csv",
+               gitlabModels.GitlabIssue{},
+               "tables/_tool_gitlab_issues.csv",
                []string{
                        "gitlab_id",
-                       "name",
-                       "description",
-                       "default_branch",
-                       "path_with_namespace",
-                       "web_url",
-                       "creator_id",
-                       "visibility",
-                       "open_issues_count",
-                       "star_count",
-                       "forked_from_project_id",
-                       "forked_from_project_web_url",
-                       "created_date",
-                       "updated_date",
                        "_raw_data_params",
                        "_raw_data_table",
                        "_raw_data_id",
diff --git a/impl/default_basic_res.go b/impl/default_basic_res.go
index 8b5233337..5fc782990 100644
--- a/impl/default_basic_res.go
+++ b/impl/default_basic_res.go
@@ -45,6 +45,24 @@ func (c *DefaultBasicRes) GetLogger() core.Logger {
        return c.logger
 }
 
+// NestedLogger returns a new DefaultBasicRes with a new nested logger
+func (c *DefaultBasicRes) NestedLogger(name string) core.BasicRes {
+       return &DefaultBasicRes{
+               cfg:    c.cfg,
+               logger: c.logger.Nested(name),
+               db:     c.db,
+       }
+}
+
+// ReplaceLogger returns a new DefaultBasicRes with the specified logger
+func (c *DefaultBasicRes) ReplaceLogger(logger core.Logger) core.BasicRes {
+       return &DefaultBasicRes{
+               cfg:    c.cfg,
+               logger: logger,
+               db:     c.db,
+       }
+}
+
 // NewDefaultBasicRes creates a new DefaultBasicRes instance
 func NewDefaultBasicRes(
        cfg *viper.Viper,
diff --git a/models/task.go b/models/task.go
index 0dd571a04..bfc9d8ab7 100644
--- a/models/task.go
+++ b/models/task.go
@@ -18,8 +18,10 @@ limitations under the License.
 package models
 
 import (
+       "encoding/json"
        "time"
 
+       "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/models/common"
        "github.com/apache/incubator-devlake/plugins/core"
        "gorm.io/datatypes"
@@ -89,3 +91,15 @@ func (Task) TableName() string {
 func (Subtask) TableName() string {
        return "_devlake_subtasks"
 }
+
+func (task *Task) GetSubTasks() ([]string, errors.Error) {
+       var subtasks []string
+       err := errors.Convert(json.Unmarshal(task.Subtasks, &subtasks))
+       return subtasks, err
+}
+
+func (task *Task) GetOptions() (map[string]interface{}, errors.Error) {
+       var options map[string]interface{}
+       err := errors.Convert(json.Unmarshal(task.Options, &options))
+       return options, err
+}
diff --git a/plugins/ae/api/init.go b/plugins/ae/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/ae/api/init.go
+++ b/plugins/ae/api/init.go
@@ -21,16 +21,14 @@ import (
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/go-playground/validator/v10"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var vld *validator.Validate
 var connectionHelper *helper.ConnectionApiHelper
 var basicRes core.BasicRes
 
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-       basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+       basicRes = br
        vld = validator.New()
        connectionHelper = helper.NewConnectionHelper(
                basicRes,
diff --git a/plugins/ae/impl/impl.go b/plugins/ae/impl/impl.go
index 3532720e3..8978ff8e1 100644
--- a/plugins/ae/impl/impl.go
+++ b/plugins/ae/impl/impl.go
@@ -27,8 +27,6 @@ import (
        "github.com/apache/incubator-devlake/plugins/ae/tasks"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var _ core.PluginMeta = (*AE)(nil)
@@ -41,8 +39,8 @@ var _ core.CloseablePluginTask = (*AE)(nil)
 
 type AE struct{}
 
-func (plugin AE) Init(config *viper.Viper, logger core.Logger, db *gorm.DB) 
errors.Error {
-       api.Init(config, logger, db)
+func (plugin AE) Init(basicRes core.BasicRes) errors.Error {
+       api.Init(basicRes)
        return nil
 }
 
diff --git a/plugins/azure/api/init.go b/plugins/azure/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/azure/api/init.go
+++ b/plugins/azure/api/init.go
@@ -21,16 +21,14 @@ import (
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/go-playground/validator/v10"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var vld *validator.Validate
 var connectionHelper *helper.ConnectionApiHelper
 var basicRes core.BasicRes
 
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-       basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+       basicRes = br
        vld = validator.New()
        connectionHelper = helper.NewConnectionHelper(
                basicRes,
diff --git a/plugins/azure/impl/impl.go b/plugins/azure/impl/impl.go
index 29328c3a6..a7bc49de5 100644
--- a/plugins/azure/impl/impl.go
+++ b/plugins/azure/impl/impl.go
@@ -27,8 +27,6 @@ import (
        "github.com/apache/incubator-devlake/plugins/azure/tasks"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 // make sure interface is implemented
@@ -49,8 +47,8 @@ func (plugin Azure) Description() string {
        return "collect some Azure data"
 }
 
-func (plugin Azure) Init(config *viper.Viper, logger core.Logger, db *gorm.DB) 
errors.Error {
-       api.Init(config, logger, db)
+func (plugin Azure) Init(basicRes core.BasicRes) errors.Error {
+       api.Init(basicRes)
        return nil
 }
 
diff --git a/plugins/bitbucket/api/init.go b/plugins/bitbucket/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/bitbucket/api/init.go
+++ b/plugins/bitbucket/api/init.go
@@ -21,16 +21,14 @@ import (
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/go-playground/validator/v10"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var vld *validator.Validate
 var connectionHelper *helper.ConnectionApiHelper
 var basicRes core.BasicRes
 
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-       basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+       basicRes = br
        vld = validator.New()
        connectionHelper = helper.NewConnectionHelper(
                basicRes,
diff --git a/plugins/bitbucket/impl/impl.go b/plugins/bitbucket/impl/impl.go
index 37e53edc9..14279749e 100644
--- a/plugins/bitbucket/impl/impl.go
+++ b/plugins/bitbucket/impl/impl.go
@@ -27,8 +27,6 @@ import (
        "github.com/apache/incubator-devlake/plugins/bitbucket/tasks"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var _ core.PluginMeta = (*Bitbucket)(nil)
@@ -42,8 +40,8 @@ var _ core.CloseablePluginTask = (*Bitbucket)(nil)
 
 type Bitbucket string
 
-func (plugin Bitbucket) Init(config *viper.Viper, logger core.Logger, db 
*gorm.DB) errors.Error {
-       api.Init(config, logger, db)
+func (plugin Bitbucket) Init(basicRes core.BasicRes) errors.Error {
+       api.Init(basicRes)
        return nil
 }
 
diff --git a/plugins/core/basic_res.go b/plugins/core/basic_res.go
index f9ff60856..9460296dc 100644
--- a/plugins/core/basic_res.go
+++ b/plugins/core/basic_res.go
@@ -26,4 +26,6 @@ type BasicRes interface {
        GetConfig(name string) string
        GetLogger() Logger
        GetDal() dal.Dal
+       NestedLogger(name string) BasicRes
+       ReplaceLogger(logger Logger) BasicRes
 }
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 6b1d80214..46077b753 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -293,3 +293,7 @@ const LockClause string = "Lock"
 func Lock(write bool, nowait bool) Clause {
        return Clause{Type: LockClause, Data: []bool{write, nowait}}
 }
+
+func Expr(expr string, params ...interface{}) DalClause {
+       return DalClause{Expr: expr, Params: params}
+}
diff --git a/plugins/core/plugin_init.go b/plugins/core/plugin_init.go
index 974cc76f0..749cd5be2 100644
--- a/plugins/core/plugin_init.go
+++ b/plugins/core/plugin_init.go
@@ -19,11 +19,9 @@ package core
 
 import (
        "github.com/apache/incubator-devlake/errors"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 // PluginInit Implement this interface if plugin needed some initialization
 type PluginInit interface {
-       Init(config *viper.Viper, logger Logger, db *gorm.DB) errors.Error
+       Init(basicRes BasicRes) errors.Error
 }
diff --git a/plugins/customize/impl/impl.go b/plugins/customize/impl/impl.go
index 9f196714a..6be99337a 100644
--- a/plugins/customize/impl/impl.go
+++ b/plugins/customize/impl/impl.go
@@ -23,10 +23,7 @@ import (
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/customize/api"
        "github.com/apache/incubator-devlake/plugins/customize/tasks"
-       "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/mitchellh/mapstructure"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var _ core.PluginMeta = (*Customize)(nil)
@@ -38,8 +35,7 @@ type Customize struct {
        handlers *api.Handlers
 }
 
-func (plugin *Customize) Init(config *viper.Viper, logger core.Logger, db 
*gorm.DB) errors.Error {
-       basicRes := helper.NewDefaultBasicRes(config, logger, db)
+func (plugin *Customize) Init(basicRes core.BasicRes) errors.Error {
        plugin.handlers = api.NewHandlers(basicRes.GetDal())
        return nil
 }
diff --git a/plugins/dora/api/data.go b/plugins/dora/api/data.go
deleted file mode 100644
index 6a0ff30f4..000000000
--- a/plugins/dora/api/data.go
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package api
-
-import (
-       "github.com/apache/incubator-devlake/errors"
-       "github.com/apache/incubator-devlake/plugins/core"
-       "net/http"
-)
-
-const RAW_DEPLOYMENTS_TABLE = `dora_deplyments`
-
-//TODO Please modify the folowing code to adapt to your plugin
-/*
-POST /plugins/dora/deployments
-{
-       TODO
-}
-*/
-func PostDeployments(input *core.ApiResourceInput) (*core.ApiResourceOutput, 
errors.Error) {
-       // TODO
-       return &core.ApiResourceOutput{Body: nil, Status: http.StatusOK}, nil
-}
-
-const RAW_ISSUES_TABLE = `dora_issues`
-
-//TODO Please modify the folowing code to adapt to your plugin
-/*
-POST /plugins/dora/issues
-{
-       TODO
-}
-*/
-func PostIssues(input *core.ApiResourceInput) (*core.ApiResourceOutput, 
errors.Error) {
-       // TODO
-       return &core.ApiResourceOutput{Body: nil, Status: http.StatusOK}, nil
-}
-
-//TODO Please modify the folowing code to adapt to your plugin
-/*
-POST /plugins/dora/issues/:id/close
-{
-       TODO
-}
-*/
-func CloseIssues(input *core.ApiResourceInput) (*core.ApiResourceOutput, 
errors.Error) {
-       // TODO
-       return &core.ApiResourceOutput{Body: nil, Status: http.StatusOK}, nil
-}
diff --git a/plugins/dora/impl/impl.go b/plugins/dora/impl/impl.go
index ca5a8f203..a3815e06b 100644
--- a/plugins/dora/impl/impl.go
+++ b/plugins/dora/impl/impl.go
@@ -19,17 +19,15 @@ package impl
 
 import (
        "encoding/json"
+
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/plugins/core"
        
"github.com/apache/incubator-devlake/plugins/dora/models/migrationscripts"
        "github.com/apache/incubator-devlake/plugins/dora/tasks"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 // make sure interface is implemented
 var _ core.PluginMeta = (*Dora)(nil)
-var _ core.PluginInit = (*Dora)(nil)
 var _ core.PluginTask = (*Dora)(nil)
 var _ core.PluginModel = (*Dora)(nil)
 var _ core.PluginMetric = (*Dora)(nil)
@@ -53,10 +51,6 @@ func (plugin Dora) SvgIcon() string {
 </svg>`
 }
 
-func (plugin Dora) Init(config *viper.Viper, logger core.Logger, db *gorm.DB) 
errors.Error {
-       return nil
-}
-
 func (plugin Dora) RequiredDataEntities() (data []map[string]interface{}, err 
errors.Error) {
        return []map[string]interface{}{
                {
diff --git a/plugins/feishu/api/init.go b/plugins/feishu/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/feishu/api/init.go
+++ b/plugins/feishu/api/init.go
@@ -21,16 +21,14 @@ import (
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/go-playground/validator/v10"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var vld *validator.Validate
 var connectionHelper *helper.ConnectionApiHelper
 var basicRes core.BasicRes
 
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-       basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+       basicRes = br
        vld = validator.New()
        connectionHelper = helper.NewConnectionHelper(
                basicRes,
diff --git a/plugins/feishu/impl/impl.go b/plugins/feishu/impl/impl.go
index 98d61102d..aa637f7c5 100644
--- a/plugins/feishu/impl/impl.go
+++ b/plugins/feishu/impl/impl.go
@@ -22,9 +22,6 @@ import (
 
        "github.com/apache/incubator-devlake/errors"
 
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
-
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/feishu/api"
        "github.com/apache/incubator-devlake/plugins/feishu/models"
@@ -43,36 +40,8 @@ var _ core.CloseablePluginTask = (*Feishu)(nil)
 
 type Feishu struct{}
 
-func (plugin Feishu) Init(config *viper.Viper, logger core.Logger, db 
*gorm.DB) errors.Error {
-       api.Init(config, logger, db)
-
-       // FIXME after config-ui support feishu plugin
-       // save env to db where name=feishu
-       connection := &models.FeishuConnection{}
-       if db.Migrator().HasTable(connection) {
-               if err := db.Find(connection, map[string]string{"name": 
"Feishu"}).Error; err != nil {
-                       return errors.Convert(err)
-               }
-               if connection.ID != 0 {
-                       encodeKey := config.GetString(core.EncodeKeyEnvStr)
-                       connection.Endpoint = 
config.GetString(`FEISHU_ENDPOINT`)
-                       connection.AppId = config.GetString(`FEISHU_APPID`)
-                       connection.SecretKey = 
config.GetString(`FEISHU_APPSCRECT`)
-                       if connection.Endpoint != `` && connection.AppId != `` 
&& connection.SecretKey != `` && encodeKey != `` {
-                               err := helper.UpdateEncryptFields(connection, 
func(plaintext string) (string, errors.Error) {
-                                       return core.Encrypt(encodeKey, 
plaintext)
-                               })
-                               if err != nil {
-                                       return err
-                               }
-                               // update from .env and save to db
-                               err = 
errors.Convert(db.Updates(connection).Error)
-                               if err != nil {
-                                       return err
-                               }
-                       }
-               }
-       }
+func (plugin Feishu) Init(basicRes core.BasicRes) errors.Error {
+       api.Init(basicRes)
        return nil
 }
 
diff --git a/plugins/gitee/api/init.go b/plugins/gitee/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/gitee/api/init.go
+++ b/plugins/gitee/api/init.go
@@ -21,16 +21,14 @@ import (
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/go-playground/validator/v10"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var vld *validator.Validate
 var connectionHelper *helper.ConnectionApiHelper
 var basicRes core.BasicRes
 
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-       basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+       basicRes = br
        vld = validator.New()
        connectionHelper = helper.NewConnectionHelper(
                basicRes,
diff --git a/plugins/gitee/impl/impl.go b/plugins/gitee/impl/impl.go
index 38d512788..bd9327cf4 100644
--- a/plugins/gitee/impl/impl.go
+++ b/plugins/gitee/impl/impl.go
@@ -28,8 +28,6 @@ import (
        
"github.com/apache/incubator-devlake/plugins/gitee/models/migrationscripts"
        "github.com/apache/incubator-devlake/plugins/gitee/tasks"
        "github.com/apache/incubator-devlake/plugins/helper"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var _ core.PluginMeta = (*Gitee)(nil)
@@ -42,8 +40,8 @@ var _ core.CloseablePluginTask = (*Gitee)(nil)
 
 type Gitee string
 
-func (plugin Gitee) Init(config *viper.Viper, logger core.Logger, db *gorm.DB) 
errors.Error {
-       api.Init(config, logger, db)
+func (plugin Gitee) Init(basicRes core.BasicRes) errors.Error {
+       api.Init(basicRes)
        return nil
 }
 
diff --git a/plugins/gitextractor/main.go b/plugins/gitextractor/main.go
index dbef0dee3..b1fba1ddc 100644
--- a/plugins/gitextractor/main.go
+++ b/plugins/gitextractor/main.go
@@ -22,14 +22,15 @@ import (
        "flag"
 
        "github.com/apache/incubator-devlake/config"
+       rootImpl "github.com/apache/incubator-devlake/impl"
+       "github.com/apache/incubator-devlake/impl/dalgorm"
        "github.com/apache/incubator-devlake/logger"
        "github.com/apache/incubator-devlake/plugins/gitextractor/impl"
        "github.com/apache/incubator-devlake/plugins/gitextractor/models"
        "github.com/apache/incubator-devlake/plugins/gitextractor/store"
        "github.com/apache/incubator-devlake/plugins/gitextractor/tasks"
        "github.com/apache/incubator-devlake/plugins/helper"
-       "gorm.io/driver/mysql"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/runner"
 )
 
 // PluginEntry is a variable exported for Framework to search and load
@@ -42,9 +43,12 @@ func main() {
        user := flag.String("user", "", "-user")
        password := flag.String("password", "", "-password")
        output := flag.String("output", "", "-output")
-       db := flag.String("db", "", "-db")
+       dbUrl := flag.String("db", "", "-db")
        flag.Parse()
+
+       cfg := config.GetConfig()
        log := logger.Global.Nested("git extractor")
+
        var storage models.Store
        var err error
        if *url == "" {
@@ -58,23 +62,22 @@ func main() {
                if err != nil {
                        panic(err)
                }
-       } else if *db != "" {
-               database, err := gorm.Open(mysql.Open(*db))
-               if err != nil {
-                       panic(err)
-               }
-               basicRes := helper.NewDefaultBasicRes(nil, log, database)
-               storage = store.NewDatabase(basicRes, *url)
+       } else if *dbUrl != "" {
+               cfg.Set("DB_URL", *dbUrl)
        } else {
                panic("either specify `-output` or `-db` argument as 
destination")
        }
+       db, err := runner.NewGormDb(cfg, log)
+       if err != nil {
+               panic(err)
+       }
+       basicRes := rootImpl.NewDefaultBasicRes(cfg, log, 
dalgorm.NewDalgorm(db))
+       storage = store.NewDatabase(basicRes, *url)
        defer storage.Close()
        ctx := context.Background()
        subTaskCtx := helper.NewStandaloneSubTaskContext(
                ctx,
-               config.GetConfig(),
-               log,
-               nil,
+               basicRes,
                "git extractor",
                nil,
        )
diff --git a/plugins/gitextractor/parser/repo.go 
b/plugins/gitextractor/parser/repo.go
index 73361f411..937441184 100644
--- a/plugins/gitextractor/parser/repo.go
+++ b/plugins/gitextractor/parser/repo.go
@@ -207,6 +207,7 @@ func (r *GitRepo) CollectCommits(subtaskCtx 
core.SubTaskContext) errors.Error {
        if err != nil {
                return err
        }
+       // TODO: this defeat the csv mode!
        db := subtaskCtx.GetDal()
        components := make([]code.Component, 0)
        err = db.All(&components, dal.From(components), dal.Where("repo_id= ?", 
r.id))
diff --git a/plugins/github/api/init.go b/plugins/github/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/github/api/init.go
+++ b/plugins/github/api/init.go
@@ -21,16 +21,14 @@ import (
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/go-playground/validator/v10"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var vld *validator.Validate
 var connectionHelper *helper.ConnectionApiHelper
 var basicRes core.BasicRes
 
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-       basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+       basicRes = br
        vld = validator.New()
        connectionHelper = helper.NewConnectionHelper(
                basicRes,
diff --git a/plugins/github/impl/impl.go b/plugins/github/impl/impl.go
index b254990df..dcfbf3183 100644
--- a/plugins/github/impl/impl.go
+++ b/plugins/github/impl/impl.go
@@ -19,10 +19,12 @@ package impl
 
 import (
        "fmt"
-       "github.com/apache/incubator-devlake/plugins/core/dal"
        "strings"
        "time"
 
+       "github.com/apache/incubator-devlake/plugins/core/dal"
+       "gorm.io/gorm"
+
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/github/api"
@@ -30,8 +32,6 @@ import (
        
"github.com/apache/incubator-devlake/plugins/github/models/migrationscripts"
        "github.com/apache/incubator-devlake/plugins/github/tasks"
        "github.com/apache/incubator-devlake/plugins/helper"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var _ core.PluginMeta = (*Github)(nil)
@@ -57,8 +57,8 @@ func (plugin Github) TransformationRule() interface{} {
        return &models.GithubTransformationRule{}
 }
 
-func (plugin Github) Init(config *viper.Viper, logger core.Logger, db 
*gorm.DB) errors.Error {
-       api.Init(config, logger, db)
+func (plugin Github) Init(basicRes core.BasicRes) errors.Error {
+       api.Init(basicRes)
        return nil
 }
 
diff --git a/plugins/github_graphql/impl/impl.go 
b/plugins/github_graphql/impl/impl.go
index 2d40445a1..bf36d3bef 100644
--- a/plugins/github_graphql/impl/impl.go
+++ b/plugins/github_graphql/impl/impl.go
@@ -20,11 +20,12 @@ package impl
 import (
        "context"
        "fmt"
-       githubImpl "github.com/apache/incubator-devlake/plugins/github/impl"
        "reflect"
        "strings"
        "time"
 
+       githubImpl "github.com/apache/incubator-devlake/plugins/github/impl"
+
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/github/models"
@@ -32,14 +33,11 @@ import (
        "github.com/apache/incubator-devlake/plugins/github_graphql/tasks"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/merico-dev/graphql"
-       "github.com/spf13/viper"
        "golang.org/x/oauth2"
-       "gorm.io/gorm"
 )
 
 // make sure interface is implemented
 var _ core.PluginMeta = (*GithubGraphql)(nil)
-var _ core.PluginInit = (*GithubGraphql)(nil)
 var _ core.PluginTask = (*GithubGraphql)(nil)
 var _ core.PluginApi = (*GithubGraphql)(nil)
 var _ core.PluginModel = (*GithubGraphql)(nil)
@@ -64,10 +62,6 @@ func (plugin GithubGraphql) Description() string {
        return "collect some GithubGraphql data"
 }
 
-func (plugin GithubGraphql) Init(config *viper.Viper, logger core.Logger, db 
*gorm.DB) errors.Error {
-       return nil
-}
-
 func (plugin GithubGraphql) GetTablesInfo() []core.Tabler {
        return []core.Tabler{}
 }
diff --git a/plugins/gitlab/api/blueprint.go b/plugins/gitlab/api/blueprint.go
index 178fc4fd4..636258202 100644
--- a/plugins/gitlab/api/blueprint.go
+++ b/plugins/gitlab/api/blueprint.go
@@ -54,7 +54,7 @@ func MakePipelinePlan(subtaskMetas []core.SubTaskMeta, 
connectionId uint64, scop
                },
                10*time.Second,
                connection.Proxy,
-               BasicRes,
+               basicRes,
        )
        if err != nil {
                return nil, err
diff --git a/plugins/gitlab/api/blueprint_V200_test.go 
b/plugins/gitlab/api/blueprint_V200_test.go
index 1a8a5b5d8..8511cdadc 100644
--- a/plugins/gitlab/api/blueprint_V200_test.go
+++ b/plugins/gitlab/api/blueprint_V200_test.go
@@ -174,7 +174,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
        assert.Equal(t, err, nil)
 
        // Refresh Global Variables and set the sql mock
-       BasicRes = unithelper.DummyBasicRes(func(mockDal *mocks.Dal) {
+       basicRes = unithelper.DummyBasicRes(func(mockDal *mocks.Dal) {
                mockDal.On("First", mock.Anything, mock.Anything).Run(func(args 
mock.Arguments) {
                        dst := args.Get(0).(*models.GitlabConnection)
                        *dst = *testGitlabConnection
@@ -191,7 +191,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
                }).Return(nil).Once()
        })
        connectionHelper = helper.NewConnectionHelper(
-               BasicRes,
+               basicRes,
                validator.New(),
        )
 
diff --git a/plugins/gitlab/api/blueprint_v200.go 
b/plugins/gitlab/api/blueprint_v200.go
index 7019cd4d2..b3228fc6d 100644
--- a/plugins/gitlab/api/blueprint_v200.go
+++ b/plugins/gitlab/api/blueprint_v200.go
@@ -179,7 +179,7 @@ func makePipelinePlanV200(subtaskMetas []core.SubTaskMeta, 
scopes []*core.Bluepr
 // GetRepoByConnectionIdAndscopeId get tbe repo by the connectionId and the 
scopeId
 func GetRepoByConnectionIdAndscopeId(connectionId uint64, scopeId string) 
(*models.GitlabProject, errors.Error) {
        repo := &models.GitlabProject{}
-       db := BasicRes.GetDal()
+       db := basicRes.GetDal()
        err := db.First(repo, dal.Where("connection_id = ? AND gitlab_id = ?", 
connectionId, scopeId))
        if err != nil {
                if db.IsErrorNotFound(err) {
@@ -196,7 +196,7 @@ func GetTransformationRuleByRepo(repo 
*models.GitlabProject) (*models.GitlabTran
        transformationRules := &models.GitlabTransformationRule{}
        transformationRuleId := repo.TransformationRuleId
        if transformationRuleId != 0 {
-               db := BasicRes.GetDal()
+               db := basicRes.GetDal()
                err := db.First(transformationRules, dal.Where("id = ?", 
transformationRuleId))
                if err != nil {
                        if db.IsErrorNotFound(err) {
diff --git a/plugins/gitlab/api/connection.go b/plugins/gitlab/api/connection.go
index 394868fb3..7cdfd66dc 100644
--- a/plugins/gitlab/api/connection.go
+++ b/plugins/gitlab/api/connection.go
@@ -53,7 +53,7 @@ func TestConnection(input *core.ApiResourceInput) 
(*core.ApiResourceOutput, erro
                },
                3*time.Second,
                connection.Proxy,
-               BasicRes,
+               basicRes,
        )
        if err != nil {
                return nil, errors.Convert(err)
diff --git a/plugins/gitlab/api/init.go b/plugins/gitlab/api/init.go
index f1fb68f9b..ff2aa65fd 100644
--- a/plugins/gitlab/api/init.go
+++ b/plugins/gitlab/api/init.go
@@ -21,19 +21,17 @@ import (
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/go-playground/validator/v10"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var vld *validator.Validate
 var connectionHelper *helper.ConnectionApiHelper
-var BasicRes core.BasicRes
+var basicRes core.BasicRes
 
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-       BasicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+       basicRes = br
        vld = validator.New()
        connectionHelper = helper.NewConnectionHelper(
-               BasicRes,
+               basicRes,
                vld,
        )
 }
diff --git a/plugins/gitlab/api/proxy.go b/plugins/gitlab/api/proxy.go
index 999cb9bfe..b9c95baef 100644
--- a/plugins/gitlab/api/proxy.go
+++ b/plugins/gitlab/api/proxy.go
@@ -48,7 +48,7 @@ func Proxy(input *core.ApiResourceInput) 
(*core.ApiResourceOutput, errors.Error)
                },
                TimeOut,
                connection.Proxy,
-               BasicRes,
+               basicRes,
        )
        if err != nil {
                return nil, err
diff --git a/plugins/gitlab/api/scope.go b/plugins/gitlab/api/scope.go
index 13f3b6149..db3358460 100644
--- a/plugins/gitlab/api/scope.go
+++ b/plugins/gitlab/api/scope.go
@@ -77,7 +77,7 @@ func PutScope(input *core.ApiResourceInput) 
(*core.ApiResourceOutput, errors.Err
                        return nil, err
                }
        }
-       err = BasicRes.GetDal().CreateOrUpdate(projects.Data)
+       err = basicRes.GetDal().CreateOrUpdate(projects.Data)
        if err != nil {
                return nil, errors.Default.Wrap(err, "error on saving 
GitlabProject")
        }
@@ -102,7 +102,7 @@ func UpdateScope(input *core.ApiResourceInput) 
(*core.ApiResourceOutput, errors.
                return nil, errors.BadInput.New("invalid connectionId or 
projectId")
        }
        var project models.GitlabProject
-       err := BasicRes.GetDal().First(&project, dal.Where("connection_id = ? 
AND gitlab_id = ?", connectionId, projectId))
+       err := basicRes.GetDal().First(&project, dal.Where("connection_id = ? 
AND gitlab_id = ?", connectionId, projectId))
        if err != nil {
                return nil, errors.Default.Wrap(err, "getting GitlabProject 
error")
        }
@@ -114,7 +114,7 @@ func UpdateScope(input *core.ApiResourceInput) 
(*core.ApiResourceOutput, errors.
        if err != nil {
                return nil, err
        }
-       err = BasicRes.GetDal().Update(project)
+       err = basicRes.GetDal().Update(project)
        if err != nil {
                return nil, errors.Default.Wrap(err, "error on saving 
GitlabProject")
        }
@@ -137,7 +137,7 @@ func GetScopeList(input *core.ApiResourceInput) 
(*core.ApiResourceOutput, errors
                return nil, errors.BadInput.New("invalid path params")
        }
        limit, offset := helper.GetLimitOffset(input.Query, "pageSize", "page")
-       err := BasicRes.GetDal().All(&projects, dal.Where("connection_id = ?", 
connectionId), dal.Limit(limit), dal.Offset(offset))
+       err := basicRes.GetDal().All(&projects, dal.Where("connection_id = ?", 
connectionId), dal.Limit(limit), dal.Offset(offset))
        if err != nil {
                return nil, err
        }
@@ -149,7 +149,7 @@ func GetScopeList(input *core.ApiResourceInput) 
(*core.ApiResourceOutput, errors
        }
        var rules []models.GitlabTransformationRule
        if len(ruleIds) > 0 {
-               err = BasicRes.GetDal().All(&rules, dal.Where("id IN (?)", 
ruleIds))
+               err = basicRes.GetDal().All(&rules, dal.Where("id IN (?)", 
ruleIds))
                if err != nil {
                        return nil, err
                }
@@ -183,7 +183,7 @@ func GetScope(input *core.ApiResourceInput) 
(*core.ApiResourceOutput, errors.Err
        if connectionId*projectId == 0 {
                return nil, errors.BadInput.New("invalid path params")
        }
-       err := BasicRes.GetDal().First(&project, dal.Where("connection_id = ? 
AND gitlab_id = ?", connectionId, projectId))
+       err := basicRes.GetDal().First(&project, dal.Where("connection_id = ? 
AND gitlab_id = ?", connectionId, projectId))
        if errors.Is(err, gorm.ErrRecordNotFound) {
                return nil, errors.NotFound.New("record not found")
        }
@@ -192,7 +192,7 @@ func GetScope(input *core.ApiResourceInput) 
(*core.ApiResourceOutput, errors.Err
        }
        var rule models.GitlabTransformationRule
        if project.TransformationRuleId > 0 {
-               err = BasicRes.GetDal().First(&rule, dal.Where("id = ?", 
project.TransformationRuleId))
+               err = basicRes.GetDal().First(&rule, dal.Where("id = ?", 
project.TransformationRuleId))
                if err != nil {
                        return nil, err
                }
diff --git a/plugins/gitlab/api/transformation_rule.go 
b/plugins/gitlab/api/transformation_rule.go
index 78dc08b2b..1c56b46d0 100644
--- a/plugins/gitlab/api/transformation_rule.go
+++ b/plugins/gitlab/api/transformation_rule.go
@@ -45,7 +45,7 @@ func CreateTransformationRule(input *core.ApiResourceInput) 
(*core.ApiResourceOu
        if err != nil {
                return nil, errors.Default.Wrap(err, "error in decoding 
transformation rule")
        }
-       err = BasicRes.GetDal().Create(&rule)
+       err = basicRes.GetDal().Create(&rule)
        if err != nil {
                return nil, errors.Default.Wrap(err, "error on saving 
TransformationRule")
        }
@@ -69,7 +69,7 @@ func UpdateTransformationRule(input *core.ApiResourceInput) 
(*core.ApiResourceOu
                return nil, errors.Default.Wrap(err, "the transformation rule 
ID should be an integer")
        }
        var old models.GitlabTransformationRule
-       err = BasicRes.GetDal().First(&old, dal.Where("id = ?", 
transformationRuleId))
+       err = basicRes.GetDal().First(&old, dal.Where("id = ?", 
transformationRuleId))
        if err != nil {
                return nil, errors.Default.Wrap(err, "error on saving 
TransformationRule")
        }
@@ -78,7 +78,7 @@ func UpdateTransformationRule(input *core.ApiResourceInput) 
(*core.ApiResourceOu
                return nil, errors.Default.Wrap(err, "error decoding map into 
transformationRule")
        }
        old.ID = transformationRuleId
-       err = BasicRes.GetDal().Update(&old, dal.Where("id = ?", 
transformationRuleId))
+       err = basicRes.GetDal().Update(&old, dal.Where("id = ?", 
transformationRuleId))
        if err != nil {
                return nil, errors.Default.Wrap(err, "error on saving 
TransformationRule")
        }
@@ -100,7 +100,7 @@ func GetTransformationRule(input *core.ApiResourceInput) 
(*core.ApiResourceOutpu
                return nil, errors.Default.Wrap(err, "the transformation rule 
ID should be an integer")
        }
        var rule models.GitlabTransformationRule
-       err = BasicRes.GetDal().First(&rule, dal.Where("id = ?", 
transformationRuleId))
+       err = basicRes.GetDal().First(&rule, dal.Where("id = ?", 
transformationRuleId))
        if err != nil {
                return nil, errors.Default.Wrap(err, "error on get 
TransformationRule")
        }
@@ -120,7 +120,7 @@ func GetTransformationRule(input *core.ApiResourceInput) 
(*core.ApiResourceOutpu
 func GetTransformationRuleList(input *core.ApiResourceInput) 
(*core.ApiResourceOutput, errors.Error) {
        var rules []models.GitlabTransformationRule
        limit, offset := helper.GetLimitOffset(input.Query, "pageSize", "page")
-       err := BasicRes.GetDal().All(&rules, dal.Limit(limit), 
dal.Offset(offset))
+       err := basicRes.GetDal().All(&rules, dal.Limit(limit), 
dal.Offset(offset))
        if err != nil {
                return nil, errors.Default.Wrap(err, "error on get 
TransformationRule list")
        }
diff --git a/plugins/gitlab/impl/impl.go b/plugins/gitlab/impl/impl.go
index fac31b7e4..49f5738d0 100644
--- a/plugins/gitlab/impl/impl.go
+++ b/plugins/gitlab/impl/impl.go
@@ -29,9 +29,6 @@ import (
        
"github.com/apache/incubator-devlake/plugins/gitlab/models/migrationscripts"
        "github.com/apache/incubator-devlake/plugins/gitlab/tasks"
        "github.com/apache/incubator-devlake/plugins/helper"
-
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var _ interface {
@@ -48,8 +45,8 @@ var _ interface {
 
 type Gitlab string
 
-func (plugin Gitlab) Init(config *viper.Viper, logger core.Logger, db 
*gorm.DB) errors.Error {
-       api.Init(config, logger, db)
+func (plugin Gitlab) Init(basicRes core.BasicRes) errors.Error {
+       api.Init(basicRes)
        return nil
 }
 
diff --git a/plugins/helper/default_task_context.go 
b/plugins/helper/default_task_context.go
index f9aea6684..b65701d8c 100644
--- a/plugins/helper/default_task_context.go
+++ b/plugins/helper/default_task_context.go
@@ -20,67 +20,20 @@ package helper
 import (
        "context"
        "fmt"
-       "github.com/apache/incubator-devlake/errors"
        "sync"
        "sync/atomic"
        "time"
 
-       "github.com/apache/incubator-devlake/impl/dalgorm"
+       "github.com/apache/incubator-devlake/errors"
+
        "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/apache/incubator-devlake/plugins/core/dal"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
-// bridge to current implementation at this point
-// TODO: implement another TaskContext for distributed runner/worker
-
-// DefaultBasicRes offers a defult BasicRes implementation
-// TODO: move to `impl` package
-type DefaultBasicRes struct {
-       cfg    *viper.Viper
-       logger core.Logger
-       db     *gorm.DB
-       dal    dal.Dal
-}
-
-// GetConfig FIXME ...
-func (c *DefaultBasicRes) GetConfig(name string) string {
-       return c.cfg.GetString(name)
-}
-
-// GetDb FIXME ...
-func (c *DefaultBasicRes) GetDb() *gorm.DB {
-       return c.db
-}
-
-// GetDal FIXME ...
-func (c *DefaultBasicRes) GetDal() dal.Dal {
-       return c.dal
-}
-
-// GetLogger FIXME ...
-func (c *DefaultBasicRes) GetLogger() core.Logger {
-       return c.logger
-}
-
-// NewDefaultBasicRes returns a new DefaultBasicRes instance
-func NewDefaultBasicRes(
-       cfg *viper.Viper,
-       logger core.Logger,
-       db *gorm.DB,
-) *DefaultBasicRes {
-       return &DefaultBasicRes{
-               cfg:    cfg,
-               logger: logger,
-               db:     db,
-               dal:    dalgorm.NewDalgorm(db),
-       }
-}
+// TODO: move this file to `impl` module
 
 // shared by TasContext and SubTaskContext
 type defaultExecContext struct {
-       *DefaultBasicRes
+       core.BasicRes
        ctx      context.Context
        name     string
        data     interface{}
@@ -92,19 +45,13 @@ type defaultExecContext struct {
 
 func newDefaultExecContext(
        ctx context.Context,
-       cfg *viper.Viper,
-       logger core.Logger,
-       db *gorm.DB,
+       basicRes core.BasicRes,
        name string,
        data interface{},
        progress chan core.RunningProgress,
 ) *defaultExecContext {
        return &defaultExecContext{
-               DefaultBasicRes: NewDefaultBasicRes(
-                       cfg,
-                       logger,
-                       db,
-               ),
+               BasicRes: basicRes,
                ctx:      ctx,
                name:     name,
                data:     data,
@@ -158,9 +105,7 @@ func (c *defaultExecContext) IncProgress(progressType 
core.ProgressType, quantit
 func (c *defaultExecContext) fork(name string) *defaultExecContext {
        return newDefaultExecContext(
                c.ctx,
-               c.cfg,
-               c.logger.Nested(name),
-               c.db,
+               c.BasicRes.NestedLogger(name),
                name,
                c.data,
                c.progress,
@@ -177,13 +122,13 @@ type DefaultTaskContext struct {
 // SetProgress FIXME ...
 func (c *DefaultTaskContext) SetProgress(current int, total int) {
        c.defaultExecContext.SetProgress(core.TaskSetProgress, current, total)
-       c.logger.Info("total step: %d", c.total)
+       c.BasicRes.GetLogger().Info("total step: %d", c.total)
 }
 
 // IncProgress FIXME ...
 func (c *DefaultTaskContext) IncProgress(quantity int) {
        c.defaultExecContext.IncProgress(core.TaskIncProgress, quantity)
-       c.logger.Info("finished step: %d / %d", c.current, c.total)
+       c.BasicRes.GetLogger().Info("finished step: %d / %d", c.current, 
c.total)
 }
 
 // DefaultSubTaskContext is default implementation
@@ -197,7 +142,7 @@ type DefaultSubTaskContext struct {
 func (c *DefaultSubTaskContext) SetProgress(current int, total int) {
        c.defaultExecContext.SetProgress(core.SubTaskSetProgress, current, 
total)
        if total > -1 {
-               c.logger.Info("total jobs: %d", c.total)
+               c.BasicRes.GetLogger().Info("total jobs: %d", c.total)
        }
 }
 
@@ -206,24 +151,22 @@ func (c *DefaultSubTaskContext) IncProgress(quantity int) 
{
        c.defaultExecContext.IncProgress(core.SubTaskIncProgress, quantity)
        if c.LastProgressTime.IsZero() || 
c.LastProgressTime.Add(3*time.Second).Before(time.Now()) || c.current%1000 == 0 
{
                c.LastProgressTime = time.Now()
-               c.logger.Info("finished records: %d", c.current)
+               c.BasicRes.GetLogger().Info("finished records: %d", c.current)
        } else {
-               c.logger.Debug("finished records: %d", c.current)
+               c.BasicRes.GetLogger().Debug("finished records: %d", c.current)
        }
 }
 
-// NewDefaultTaskContext FIXME ...
+// NewDefaultTaskContext holds everything needed by the task execution.
 func NewDefaultTaskContext(
        ctx context.Context,
-       cfg *viper.Viper,
-       logger core.Logger,
-       db *gorm.DB,
+       basicRes core.BasicRes,
        name string,
        subtasks map[string]bool,
        progress chan core.RunningProgress,
 ) core.TaskContext {
        return &DefaultTaskContext{
-               newDefaultExecContext(ctx, cfg, logger, db, name, nil, 
progress),
+               newDefaultExecContext(ctx, basicRes, name, nil, progress),
                subtasks,
                make(map[string]*DefaultSubTaskContext),
        }
@@ -259,14 +202,12 @@ func (c *DefaultTaskContext) SubTaskContext(subtask 
string) (core.SubTaskContext
 // going through the usual workflow.
 func NewStandaloneSubTaskContext(
        ctx context.Context,
-       cfg *viper.Viper,
-       logger core.Logger,
-       db *gorm.DB,
+       basicRes core.BasicRes,
        name string,
        data interface{},
 ) core.SubTaskContext {
        return &DefaultSubTaskContext{
-               newDefaultExecContext(ctx, cfg, logger, db, name, data, nil),
+               newDefaultExecContext(ctx, basicRes, name, data, nil),
                nil,
                time.Time{},
        }
diff --git a/plugins/icla/impl/impl.go b/plugins/icla/impl/impl.go
index 00900de20..d2e47aa5f 100644
--- a/plugins/icla/impl/impl.go
+++ b/plugins/icla/impl/impl.go
@@ -26,8 +26,6 @@ import (
        "github.com/apache/incubator-devlake/plugins/icla/models"
        
"github.com/apache/incubator-devlake/plugins/icla/models/migrationscripts"
        "github.com/apache/incubator-devlake/plugins/icla/tasks"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 // make sure interface is implemented
@@ -45,7 +43,7 @@ func (plugin Icla) Description() string {
        return "collect some Icla data"
 }
 
-func (plugin Icla) Init(config *viper.Viper, logger core.Logger, db *gorm.DB) 
errors.Error {
+func (plugin Icla) Init(basicRes core.BasicRes) errors.Error {
        return nil
 }
 
diff --git a/plugins/jenkins/api/init.go b/plugins/jenkins/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/jenkins/api/init.go
+++ b/plugins/jenkins/api/init.go
@@ -21,16 +21,14 @@ import (
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/go-playground/validator/v10"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var vld *validator.Validate
 var connectionHelper *helper.ConnectionApiHelper
 var basicRes core.BasicRes
 
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-       basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+       basicRes = br
        vld = validator.New()
        connectionHelper = helper.NewConnectionHelper(
                basicRes,
diff --git a/plugins/jenkins/impl/impl.go b/plugins/jenkins/impl/impl.go
index 49a28b884..278906673 100644
--- a/plugins/jenkins/impl/impl.go
+++ b/plugins/jenkins/impl/impl.go
@@ -19,10 +19,11 @@ package impl
 
 import (
        "fmt"
-       "github.com/apache/incubator-devlake/plugins/core/dal"
        "strings"
        "time"
 
+       "github.com/apache/incubator-devlake/plugins/core/dal"
+
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
@@ -30,8 +31,6 @@ import (
        "github.com/apache/incubator-devlake/plugins/jenkins/models"
        
"github.com/apache/incubator-devlake/plugins/jenkins/models/migrationscripts"
        "github.com/apache/incubator-devlake/plugins/jenkins/tasks"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var _ core.PluginMeta = (*Jenkins)(nil)
@@ -45,8 +44,8 @@ var _ core.PluginSource = (*Jenkins)(nil)
 
 type Jenkins struct{}
 
-func (plugin Jenkins) Init(config *viper.Viper, logger core.Logger, db 
*gorm.DB) errors.Error {
-       api.Init(config, logger, db)
+func (plugin Jenkins) Init(basicRes core.BasicRes) errors.Error {
+       api.Init(basicRes)
        return nil
 }
 
diff --git a/plugins/jira/api/init.go b/plugins/jira/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/jira/api/init.go
+++ b/plugins/jira/api/init.go
@@ -21,16 +21,14 @@ import (
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/go-playground/validator/v10"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var vld *validator.Validate
 var connectionHelper *helper.ConnectionApiHelper
 var basicRes core.BasicRes
 
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-       basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+       basicRes = br
        vld = validator.New()
        connectionHelper = helper.NewConnectionHelper(
                basicRes,
diff --git a/plugins/jira/impl/impl.go b/plugins/jira/impl/impl.go
index 67fe3f87e..7a9cdf013 100644
--- a/plugins/jira/impl/impl.go
+++ b/plugins/jira/impl/impl.go
@@ -18,7 +18,6 @@ limitations under the License.
 package impl
 
 import (
-       goerror "errors"
        "fmt"
        "net/http"
        "time"
@@ -31,8 +30,6 @@ import (
        "github.com/apache/incubator-devlake/plugins/jira/models"
        
"github.com/apache/incubator-devlake/plugins/jira/models/migrationscripts"
        "github.com/apache/incubator-devlake/plugins/jira/tasks"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var _ core.PluginMeta = (*Jira)(nil)
@@ -60,8 +57,8 @@ func (plugin Jira) TransformationRule() interface{} {
        return &models.JiraTransformationRule{}
 }
 
-func (plugin *Jira) Init(config *viper.Viper, logger core.Logger, db *gorm.DB) 
errors.Error {
-       api.Init(config, logger, db)
+func (plugin *Jira) Init(basicRes core.BasicRes) errors.Error {
+       api.Init(basicRes)
        return nil
 }
 
@@ -184,7 +181,7 @@ func (plugin Jira) PrepareTaskData(taskCtx 
core.TaskContext, options map[string]
        if op.TransformationRules == nil && op.TransformationRuleId != 0 {
                var transformationRule models.JiraTransformationRule
                err = taskCtx.GetDal().First(&transformationRule, dal.Where("id 
= ?", op.TransformationRuleId))
-               if err != nil && !goerror.Is(err, gorm.ErrRecordNotFound) {
+               if err != nil && db.IsErrorNotFound(err) {
                        return nil, errors.BadInput.Wrap(err, "fail to get 
transformationRule")
                }
                op.TransformationRules, err = 
tasks.MakeTransformationRules(transformationRule)
diff --git a/plugins/org/api/handlers.go b/plugins/org/api/handlers.go
index 95d7c4103..8596b6476 100644
--- a/plugins/org/api/handlers.go
+++ b/plugins/org/api/handlers.go
@@ -19,11 +19,11 @@ package api
 
 import (
        "encoding/csv"
-       "github.com/apache/incubator-devlake/errors"
        "net/http"
 
+       "github.com/apache/incubator-devlake/errors"
+
        "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/gocarina/gocsv"
 )
 
@@ -33,8 +33,8 @@ type Handlers struct {
        store store
 }
 
-func NewHandlers(db dal.Dal, basicRes core.BasicRes) *Handlers {
-       return &Handlers{store: NewDbStore(db, basicRes)}
+func NewHandlers(basicRes core.BasicRes) *Handlers {
+       return &Handlers{store: NewDbStore(basicRes.GetDal(), basicRes)}
 }
 
 func (h *Handlers) unmarshal(r *http.Request, items interface{}) errors.Error {
diff --git a/plugins/org/impl/impl.go b/plugins/org/impl/impl.go
index 76ed1ba9b..902eca8f1 100644
--- a/plugins/org/impl/impl.go
+++ b/plugins/org/impl/impl.go
@@ -19,13 +19,10 @@ package impl
 
 import (
        "github.com/apache/incubator-devlake/errors"
-       "github.com/apache/incubator-devlake/impl/dalgorm"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/apache/incubator-devlake/plugins/org/api"
        "github.com/apache/incubator-devlake/plugins/org/tasks"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var _ core.PluginMeta = (*Org)(nil)
@@ -37,9 +34,8 @@ type Org struct {
        handlers *api.Handlers
 }
 
-func (plugin *Org) Init(config *viper.Viper, logger core.Logger, db *gorm.DB) 
errors.Error {
-       basicRes := helper.NewDefaultBasicRes(config, logger, db)
-       plugin.handlers = api.NewHandlers(dalgorm.NewDalgorm(db), basicRes)
+func (plugin *Org) Init(basicRes core.BasicRes) errors.Error {
+       plugin.handlers = api.NewHandlers(basicRes)
        return nil
 }
 
diff --git a/plugins/pagerduty/api/init.go b/plugins/pagerduty/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/pagerduty/api/init.go
+++ b/plugins/pagerduty/api/init.go
@@ -21,16 +21,14 @@ import (
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/go-playground/validator/v10"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var vld *validator.Validate
 var connectionHelper *helper.ConnectionApiHelper
 var basicRes core.BasicRes
 
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-       basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+       basicRes = br
        vld = validator.New()
        connectionHelper = helper.NewConnectionHelper(
                basicRes,
diff --git a/plugins/pagerduty/impl/impl.go b/plugins/pagerduty/impl/impl.go
index dcaabb3e8..1c43f519f 100644
--- a/plugins/pagerduty/impl/impl.go
+++ b/plugins/pagerduty/impl/impl.go
@@ -19,6 +19,8 @@ package impl
 
 import (
        "fmt"
+       "time"
+
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/tap"
        "github.com/apache/incubator-devlake/plugins/core"
@@ -27,9 +29,6 @@ import (
        "github.com/apache/incubator-devlake/plugins/pagerduty/models"
        
"github.com/apache/incubator-devlake/plugins/pagerduty/models/migrationscripts"
        "github.com/apache/incubator-devlake/plugins/pagerduty/tasks"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
-       "time"
 )
 
 // make sure interface is implemented
@@ -46,8 +45,8 @@ func (plugin PagerDuty) Description() string {
        return "collect some PagerDuty data"
 }
 
-func (plugin PagerDuty) Init(config *viper.Viper, logger core.Logger, db 
*gorm.DB) errors.Error {
-       api.Init(config, logger, db)
+func (plugin PagerDuty) Init(basicRes core.BasicRes) errors.Error {
+       api.Init(basicRes)
        return nil
 }
 
diff --git a/plugins/refdiff/impl/impl.go b/plugins/refdiff/impl/impl.go
index 1007b2d38..c81bc137f 100644
--- a/plugins/refdiff/impl/impl.go
+++ b/plugins/refdiff/impl/impl.go
@@ -22,13 +22,10 @@ import (
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/apache/incubator-devlake/plugins/refdiff/tasks"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 // make sure interface is implemented
 var _ core.PluginMeta = (*RefDiff)(nil)
-var _ core.PluginInit = (*RefDiff)(nil)
 var _ core.PluginTask = (*RefDiff)(nil)
 var _ core.PluginApi = (*RefDiff)(nil)
 var _ core.PluginModel = (*RefDiff)(nil)
@@ -63,10 +60,6 @@ func (plugin RefDiff) Settings() interface{} {
        return nil
 }
 
-func (plugin RefDiff) Init(config *viper.Viper, logger core.Logger, db 
*gorm.DB) errors.Error {
-       return nil
-}
-
 func (plugin RefDiff) SubTaskMetas() []core.SubTaskMeta {
        return []core.SubTaskMeta{
                tasks.CalculateCommitsDiffMeta,
diff --git a/plugins/tapd/api/init.go b/plugins/tapd/api/init.go
index ef6c55f79..ff2aa65fd 100644
--- a/plugins/tapd/api/init.go
+++ b/plugins/tapd/api/init.go
@@ -21,20 +21,14 @@ import (
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/go-playground/validator/v10"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
-var db *gorm.DB
-var cfg *viper.Viper
 var vld *validator.Validate
 var connectionHelper *helper.ConnectionApiHelper
 var basicRes core.BasicRes
 
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-       db = database
-       cfg = config
-       basicRes = helper.NewDefaultBasicRes(cfg, logger, db)
+func Init(br core.BasicRes) {
+       basicRes = br
        vld = validator.New()
        connectionHelper = helper.NewConnectionHelper(
                basicRes,
diff --git a/plugins/tapd/impl/impl.go b/plugins/tapd/impl/impl.go
index c72855203..a29cdb46d 100644
--- a/plugins/tapd/impl/impl.go
+++ b/plugins/tapd/impl/impl.go
@@ -29,8 +29,6 @@ import (
        "github.com/apache/incubator-devlake/plugins/tapd/models"
        
"github.com/apache/incubator-devlake/plugins/tapd/models/migrationscripts"
        "github.com/apache/incubator-devlake/plugins/tapd/tasks"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var _ core.PluginMeta = (*Tapd)(nil)
@@ -43,8 +41,8 @@ var _ core.CloseablePluginTask = (*Tapd)(nil)
 
 type Tapd struct{}
 
-func (plugin Tapd) Init(config *viper.Viper, logger core.Logger, db *gorm.DB) 
errors.Error {
-       api.Init(config, logger, db)
+func (plugin Tapd) Init(basicRes core.BasicRes) errors.Error {
+       api.Init(basicRes)
        return nil
 }
 
diff --git a/plugins/webhook/api/init.go b/plugins/webhook/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/webhook/api/init.go
+++ b/plugins/webhook/api/init.go
@@ -21,16 +21,14 @@ import (
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/go-playground/validator/v10"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var vld *validator.Validate
 var connectionHelper *helper.ConnectionApiHelper
 var basicRes core.BasicRes
 
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-       basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+       basicRes = br
        vld = validator.New()
        connectionHelper = helper.NewConnectionHelper(
                basicRes,
diff --git a/plugins/webhook/impl/impl.go b/plugins/webhook/impl/impl.go
index b3dfdcfc6..9cbd822a1 100644
--- a/plugins/webhook/impl/impl.go
+++ b/plugins/webhook/impl/impl.go
@@ -22,8 +22,6 @@ import (
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/webhook/api"
        
"github.com/apache/incubator-devlake/plugins/webhook/models/migrationscripts"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 // make sure interface is implemented
@@ -39,8 +37,8 @@ func (plugin Webhook) Description() string {
        return "collect some Webhook data"
 }
 
-func (plugin Webhook) Init(config *viper.Viper, logger core.Logger, db 
*gorm.DB) errors.Error {
-       api.Init(config, logger, db)
+func (plugin Webhook) Init(basicRes core.BasicRes) errors.Error {
+       api.Init(basicRes)
        return nil
 }
 
diff --git a/plugins/zentao/api/init.go b/plugins/zentao/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/zentao/api/init.go
+++ b/plugins/zentao/api/init.go
@@ -21,16 +21,14 @@ import (
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/go-playground/validator/v10"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 var vld *validator.Validate
 var connectionHelper *helper.ConnectionApiHelper
 var basicRes core.BasicRes
 
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-       basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+       basicRes = br
        vld = validator.New()
        connectionHelper = helper.NewConnectionHelper(
                basicRes,
diff --git a/plugins/zentao/impl/impl.go b/plugins/zentao/impl/impl.go
index 8b10c1e9e..1c6a0b625 100644
--- a/plugins/zentao/impl/impl.go
+++ b/plugins/zentao/impl/impl.go
@@ -19,6 +19,7 @@ package impl
 
 import (
        "fmt"
+
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
@@ -26,8 +27,6 @@ import (
        "github.com/apache/incubator-devlake/plugins/zentao/models"
        
"github.com/apache/incubator-devlake/plugins/zentao/models/migrationscripts"
        "github.com/apache/incubator-devlake/plugins/zentao/tasks"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 // make sure interface is implemented
@@ -44,8 +43,8 @@ func (plugin Zentao) Description() string {
        return "collect some Zentao data"
 }
 
-func (plugin Zentao) Init(config *viper.Viper, logger core.Logger, db 
*gorm.DB) errors.Error {
-       api.Init(config, logger, db)
+func (plugin Zentao) Init(basicRes core.BasicRes) errors.Error {
+       api.Init(basicRes)
        return nil
 }
 
diff --git a/plugins/azure/api/init.go b/runner/basic_res.go
similarity index 51%
copy from plugins/azure/api/init.go
copy to runner/basic_res.go
index 6774e1482..8bd740fe6 100644
--- a/plugins/azure/api/init.go
+++ b/runner/basic_res.go
@@ -15,25 +15,31 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package api
+package runner
 
 import (
+       "github.com/apache/incubator-devlake/config"
+       "github.com/apache/incubator-devlake/impl"
+       "github.com/apache/incubator-devlake/impl/dalgorm"
+       "github.com/apache/incubator-devlake/logger"
        "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/apache/incubator-devlake/plugins/helper"
-       "github.com/go-playground/validator/v10"
        "github.com/spf13/viper"
        "gorm.io/gorm"
 )
 
-var vld *validator.Validate
-var connectionHelper *helper.ConnectionApiHelper
-var basicRes core.BasicRes
+// CreateAppBasicRes returns a application level BasicRes instance based on 
.env/environment variables
+// it is useful because multiple places need BasicRes including `main.go` 
`directrun` and `worker`
+func CreateAppBasicRes() core.BasicRes {
+       cfg := config.GetConfig()
+       log := logger.Global
+       db, err := NewGormDb(cfg, logger.Global)
+       if err != nil {
+               panic(err)
+       }
+       return CreateBasicRes(cfg, log, db)
+}
 
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-       basicRes = helper.NewDefaultBasicRes(config, logger, database)
-       vld = validator.New()
-       connectionHelper = helper.NewConnectionHelper(
-               basicRes,
-               vld,
-       )
+// CreateBasicRes returns a BasicRes based on what was given
+func CreateBasicRes(cfg *viper.Viper, log core.Logger, db *gorm.DB) 
core.BasicRes {
+       return impl.NewDefaultBasicRes(cfg, log, dalgorm.NewDalgorm(db))
 }
diff --git a/runner/directrun.go b/runner/directrun.go
index 8b805743d..7794aa88c 100644
--- a/runner/directrun.go
+++ b/runner/directrun.go
@@ -19,6 +19,7 @@ package runner
 
 import (
        "context"
+       "encoding/json"
        goerror "errors"
        "fmt"
        "io"
@@ -27,10 +28,7 @@ import (
        "runtime"
        "syscall"
 
-       "github.com/apache/incubator-devlake/config"
-       "github.com/apache/incubator-devlake/impl"
-       "github.com/apache/incubator-devlake/impl/dalgorm"
-       "github.com/apache/incubator-devlake/logger"
+       "github.com/apache/incubator-devlake/models"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/spf13/cobra"
 )
@@ -50,18 +48,13 @@ func RunCmd(cmd *cobra.Command) {
 // pluginTask: specific built-in plugin, for example: feishu, jira...
 // options: plugin config
 func DirectRun(cmd *cobra.Command, args []string, pluginTask core.PluginTask, 
options map[string]interface{}) {
+       basicRes := CreateAppBasicRes()
        tasks, err := cmd.Flags().GetStringSlice("subtasks")
        if err != nil {
                panic(err)
        }
-       cfg := config.GetConfig()
-       log := logger.Global.Nested(cmd.Use)
-       db, err := NewGormDb(cfg, log)
-       if err != nil {
-               panic(err)
-       }
        if pluginInit, ok := pluginTask.(core.PluginInit); ok {
-               err = pluginInit.Init(cfg, log, db)
+               err = pluginInit.Init(basicRes)
                if err != nil {
                        panic(err)
                }
@@ -73,7 +66,7 @@ func DirectRun(cmd *cobra.Command, args []string, pluginTask 
core.PluginTask, op
        }
 
        // collect migration and run
-       migrator, err := InitMigrator(impl.NewDefaultBasicRes(cfg, log, 
dalgorm.NewDalgorm(db)))
+       migrator, err := InitMigrator(basicRes)
        if err != nil {
                panic(err)
        }
@@ -85,16 +78,23 @@ func DirectRun(cmd *cobra.Command, args []string, 
pluginTask core.PluginTask, op
                panic(err)
        }
        ctx := createContext()
-       var parentTaskID uint64
+       optionsJson, err := json.Marshal(options)
+       if err != nil {
+               panic(err)
+       }
+       subtasksJson, err := json.Marshal(tasks)
+       if err != nil {
+               panic(err)
+       }
+       task := &models.Task{
+               Plugin:   cmd.Use,
+               Options:  optionsJson,
+               Subtasks: subtasksJson,
+       }
        err = RunPluginSubTasks(
                ctx,
-               cfg,
-               log,
-               db,
-               parentTaskID,
-               cmd.Use,
-               tasks,
-               options,
+               basicRes,
+               task,
                pluginTask,
                nil,
        )
diff --git a/runner/loader.go b/runner/loader.go
index ff50ff4c9..20c26f1e6 100644
--- a/runner/loader.go
+++ b/runner/loader.go
@@ -26,12 +26,11 @@ import (
        "strings"
 
        "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
 // LoadPlugins load plugins from local directory
-func LoadPlugins(pluginsDir string, config *viper.Viper, logger core.Logger, 
db *gorm.DB) errors.Error {
+func LoadPlugins(basicRes core.BasicRes) errors.Error {
+       pluginsDir := basicRes.GetConfig("PLUGIN_DIR")
        walkErr := filepath.WalkDir(pluginsDir, func(path string, d 
fs.DirEntry, err error) error {
                if err != nil {
                        return err
@@ -52,7 +51,7 @@ func LoadPlugins(pluginsDir string, config *viper.Viper, 
logger core.Logger, db
                                return errors.Default.New(fmt.Sprintf("%s 
PluginEntry must implement PluginMeta interface", pluginName))
                        }
                        if plugin, ok := symPluginEntry.(core.PluginInit); ok {
-                               err = plugin.Init(config, logger, db)
+                               err = plugin.Init(basicRes)
                                if err != nil {
                                        return err
                                }
@@ -62,7 +61,7 @@ func LoadPlugins(pluginsDir string, config *viper.Viper, 
logger core.Logger, db
                                return nil
                        }
 
-                       logger.Info(`plugin loaded %s`, pluginName)
+                       basicRes.GetLogger().Info(`plugin loaded %s`, 
pluginName)
                }
                return nil
        })
diff --git a/runner/run_pipeline.go b/runner/run_pipeline.go
index 32b85d69a..f7d6f491e 100644
--- a/runner/run_pipeline.go
+++ b/runner/run_pipeline.go
@@ -23,23 +23,25 @@ import (
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/models"
        "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
 )
 
 // RunPipeline FIXME ...
 func RunPipeline(
-       _ *viper.Viper,
-       log core.Logger,
-       db *gorm.DB,
+       basicRes core.BasicRes,
        pipelineId uint64,
        runTasks func([]uint64) errors.Error,
 ) errors.Error {
        // load tasks for pipeline
+       db := basicRes.GetDal()
        var tasks []models.Task
-       err := db.Where("pipeline_id = ? AND status = ?", pipelineId, 
models.TASK_CREATED).Order("pipeline_row, pipeline_col").Find(&tasks).Error
+       err := db.All(
+               &tasks,
+               dal.Where("pipeline_id = ? AND status = ?", pipelineId, 
models.TASK_CREATED),
+               dal.Orderby("pipeline_row, pipeline_col"),
+       )
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
        taskIds := make([][]uint64, 0)
        for _, task := range tasks {
@@ -48,31 +50,32 @@ func RunPipeline(
                }
                taskIds[task.PipelineRow-1] = 
append(taskIds[task.PipelineRow-1], task.ID)
        }
-       return runPipelineTasks(log, db, pipelineId, taskIds, runTasks)
+       return runPipelineTasks(basicRes, pipelineId, taskIds, runTasks)
 }
 
 func runPipelineTasks(
-       log core.Logger,
-       db *gorm.DB,
+       basicRes core.BasicRes,
        pipelineId uint64,
        taskIds [][]uint64,
        runTasks func([]uint64) errors.Error,
 ) errors.Error {
+       db := basicRes.GetDal()
+       log := basicRes.GetLogger()
        // load pipeline from db
        dbPipeline := &models.DbPipeline{}
-       err := db.Find(dbPipeline, pipelineId).Error
+       err := db.First(dbPipeline, dal.Where("id = ?", pipelineId))
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
 
        // This double for loop executes each set of tasks sequentially while
        // executing the set of tasks concurrently.
        for i, row := range taskIds {
                // update stage
-               err = db.Model(dbPipeline).Updates(map[string]interface{}{
-                       "status": models.TASK_RUNNING,
-                       "stage":  i + 1,
-               }).Error
+               err = db.UpdateColumns(dbPipeline, []dal.DalSet{
+                       {ColumnName: "status", Value: models.TASK_RUNNING},
+                       {ColumnName: "stage", Value: i + 1},
+               })
                if err != nil {
                        log.Error(err, "update pipeline state failed")
                        break
@@ -81,18 +84,16 @@ func runPipelineTasks(
                err = runTasks(row)
                if err != nil {
                        log.Error(err, "run tasks failed")
-                       return errors.Convert(err)
+                       return err
                }
 
                // update finishedTasks
-               err = db.Model(dbPipeline).Updates(map[string]interface{}{
-                       "finished_tasks": gorm.Expr("finished_tasks + ?", 
len(row)),
-               }).Error
+               err = db.UpdateColumn(dbPipeline, "finished_tasks", 
dal.Expr("finished_tasks + ?", len(row)))
                if err != nil {
                        log.Error(err, "update pipeline state failed")
-                       return errors.Convert(err)
+                       return err
                }
        }
        log.Info("pipeline finished in %d ms: %v", 
time.Now().UnixMilli()-dbPipeline.BeganAt.UnixMilli(), err)
-       return errors.Convert(err)
+       return err
 }
diff --git a/runner/run_task.go b/runner/run_task.go
index 5a59d4f9f..09528a6cc 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -19,45 +19,42 @@ package runner
 
 import (
        "context"
-       "encoding/json"
        "fmt"
+       "time"
+
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/logger"
-       "time"
 
-       "github.com/apache/incubator-devlake/config"
        "github.com/apache/incubator-devlake/models"
        "github.com/apache/incubator-devlake/utils"
-       "github.com/spf13/viper"
-       "gorm.io/gorm"
 
        "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/apache/incubator-devlake/plugins/helper"
 )
 
 // RunTask FIXME ...
 func RunTask(
        ctx context.Context,
-       _ *viper.Viper,
-       parentLogger core.Logger,
-       db *gorm.DB,
+       basicRes core.BasicRes,
        progress chan core.RunningProgress,
        taskId uint64,
 ) (err errors.Error) {
+       db := basicRes.GetDal()
        task := &models.Task{}
-       if err := db.Find(task, taskId).Error; err != nil {
-               return errors.Convert(err)
+       if err := db.First(task, dal.Where("id = ?", taskId)); err != nil {
+               return err
        }
        if task.Status == models.TASK_COMPLETED {
                return errors.Default.New("invalid task status")
        }
        dbPipeline := &models.DbPipeline{}
-       if err := db.Find(dbPipeline, task.PipelineId).Error; err != nil {
-               return errors.Convert(err)
+       if err := db.First(dbPipeline, dal.Where("id = ? ", task.PipelineId)); 
err != nil {
+               return err
        }
-       log, err := getTaskLogger(parentLogger, task)
+       log, err := getTaskLogger(basicRes.GetLogger(), task)
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
        beganAt := time.Now()
        // make sure task status always correct even if it panicked
@@ -86,23 +83,23 @@ func RunTask(
                        } else {
                                lakeErr = errors.Convert(err)
                        }
-                       dbe := db.Model(task).Updates(map[string]interface{}{
-                               "status":          models.TASK_FAILED,
-                               "message":         lakeErr.Messages().Format(),
-                               "finished_at":     finishedAt,
-                               "spent_seconds":   spentSeconds,
-                               "failed_sub_task": subTaskName,
-                       }).Error
+                       dbe := db.UpdateColumns(task, []dal.DalSet{
+                               {ColumnName: "status", Value: 
models.TASK_FAILED},
+                               {ColumnName: "message", Value: 
lakeErr.Messages().Format()},
+                               {ColumnName: "finished_at", Value: finishedAt},
+                               {ColumnName: "spent_seconds", Value: 
spentSeconds},
+                               {ColumnName: "failed_sub_task", Value: 
subTaskName},
+                       })
                        if dbe != nil {
                                log.Error(err, "failed to finalize task status 
into db (task failed)")
                        }
                } else {
-                       dbe := db.Model(task).Updates(map[string]interface{}{
-                               "status":        models.TASK_COMPLETED,
-                               "message":       "",
-                               "finished_at":   finishedAt,
-                               "spent_seconds": spentSeconds,
-                       }).Error
+                       dbe := db.UpdateColumns(task, []dal.DalSet{
+                               {ColumnName: "status", Value: 
models.TASK_COMPLETED},
+                               {ColumnName: "message", Value: ""},
+                               {ColumnName: "finished_at", Value: finishedAt},
+                               {ColumnName: "spent_seconds", Value: 
spentSeconds},
+                       })
                        if dbe != nil {
                                log.Error(err, "failed to finalize task status 
into db (task succeeded)")
                        }
@@ -111,34 +108,19 @@ func RunTask(
 
        // start execution
        log.Info("start executing task: %d", task.ID)
-       if err := db.Model(task).Updates(map[string]interface{}{
-               "status":   models.TASK_RUNNING,
-               "message":  "",
-               "began_at": beganAt,
-       }).Error; err != nil {
-               return errors.Convert(err)
-       }
-
-       var options map[string]interface{}
-       err = errors.Convert(json.Unmarshal(task.Options, &options))
-       if err != nil {
-               return err
-       }
-       var subtasks []string
-       err = errors.Convert(json.Unmarshal(task.Subtasks, &subtasks))
-       if err != nil {
-               return err
+       dbe := db.UpdateColumns(task, []dal.DalSet{
+               {ColumnName: "status", Value: models.TASK_RUNNING},
+               {ColumnName: "message", Value: ""},
+               {ColumnName: "began_at", Value: beganAt},
+       })
+       if dbe != nil {
+               return dbe
        }
 
        err = RunPluginTask(
                ctx,
-               config.GetConfig(),
-               log.Nested(task.Plugin),
-               db,
-               task.ID,
-               task.Plugin,
-               subtasks,
-               options,
+               basicRes.ReplaceLogger(log),
+               task,
                progress,
        )
        if dbPipeline.SkipOnFail {
@@ -150,32 +132,22 @@ func RunTask(
 // RunPluginTask FIXME ...
 func RunPluginTask(
        ctx context.Context,
-       cfg *viper.Viper,
-       log core.Logger,
-       db *gorm.DB,
-       taskID uint64,
-       name string,
-       subtasks []string,
-       options map[string]interface{},
+       basicRes core.BasicRes,
+       task *models.Task,
        progress chan core.RunningProgress,
 ) errors.Error {
-       pluginMeta, err := core.GetPlugin(name)
+       pluginMeta, err := core.GetPlugin(task.Plugin)
        if err != nil {
                return errors.Default.WrapRaw(err)
        }
        pluginTask, ok := pluginMeta.(core.PluginTask)
        if !ok {
-               return errors.Default.New(fmt.Sprintf("plugin %s doesn't 
support PluginTask interface", name))
+               return errors.Default.New(fmt.Sprintf("plugin %s doesn't 
support PluginTask interface", task.Plugin))
        }
        return RunPluginSubTasks(
                ctx,
-               cfg,
-               log,
-               db,
-               taskID,
-               name,
-               subtasks,
-               options,
+               basicRes,
+               task,
                pluginTask,
                progress,
        )
@@ -184,16 +156,12 @@ func RunPluginTask(
 // RunPluginSubTasks FIXME ...
 func RunPluginSubTasks(
        ctx context.Context,
-       cfg *viper.Viper,
-       log core.Logger,
-       db *gorm.DB,
-       taskID uint64,
-       name string,
-       subtaskNames []string,
-       options map[string]interface{},
+       basicRes core.BasicRes,
+       task *models.Task,
        pluginTask core.PluginTask,
        progress chan core.RunningProgress,
 ) errors.Error {
+       log := basicRes.GetLogger()
        log.Info("start plugin")
        // find out all possible subtasks this plugin can offer
        subtaskMetas := pluginTask.SubTaskMetas()
@@ -210,6 +178,10 @@ func RunPluginSubTasks(
        */
 
        // user specifies what subtasks to run
+       subtaskNames, err := task.GetSubTasks()
+       if err != nil {
+               return err
+       }
        if len(subtaskNames) != 0 {
                // decode user specified subtasks
                var specifiedTasks []string
@@ -248,13 +220,17 @@ func RunPluginSubTasks(
                }
        }
 
-       taskCtx := helper.NewDefaultTaskContext(ctx, cfg, log, db, name, 
subtasksFlag, progress)
+       taskCtx := helper.NewDefaultTaskContext(ctx, basicRes, task.Plugin, 
subtasksFlag, progress)
        if closeablePlugin, ok := pluginTask.(core.CloseablePluginTask); ok {
                defer closeablePlugin.Close(taskCtx)
        }
+       options, err := task.GetOptions()
+       if err != nil {
+               return err
+       }
        taskData, err := pluginTask.PrepareTaskData(taskCtx, options)
        if err != nil {
-               return errors.Default.Wrap(err, fmt.Sprintf("error preparing 
task data for %s", name))
+               return errors.Default.Wrap(err, fmt.Sprintf("error preparing 
task data for %s", task.Plugin))
        }
        taskCtx.SetData(taskData)
 
@@ -282,7 +258,7 @@ func RunPluginSubTasks(
                                SubTaskNumber: subtaskNumber,
                        }
                }
-               err = runSubtask(log, db, taskID, subtaskNumber, subtaskCtx, 
subtaskMeta.EntryPoint)
+               err = runSubtask(basicRes, subtaskCtx, task.ID, subtaskNumber, 
subtaskMeta.EntryPoint)
                if err != nil {
                        err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask 
%s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta))
                        log.Error(err, "")
@@ -295,7 +271,7 @@ func RunPluginSubTasks(
 }
 
 // UpdateProgressDetail FIXME ...
-func UpdateProgressDetail(db *gorm.DB, log core.Logger, taskId uint64, 
progressDetail *models.TaskProgressDetail, p *core.RunningProgress) {
+func UpdateProgressDetail(basicRes core.BasicRes, taskId uint64, 
progressDetail *models.TaskProgressDetail, p *core.RunningProgress) {
        task := &models.Task{}
        task.ID = taskId
        switch p.Type {
@@ -306,9 +282,9 @@ func UpdateProgressDetail(db *gorm.DB, log core.Logger, 
taskId uint64, progressD
                progressDetail.FinishedSubTasks = p.Current
                // TODO: get rid of db update
                pct := float32(p.Current) / float32(p.Total)
-               err := db.Model(task).Update("progress", pct).Error
+               err := basicRes.GetDal().UpdateColumn(task, "progress", pct)
                if err != nil {
-                       log.Error(err, "failed to update progress")
+                       basicRes.GetLogger().Error(err, "failed to update 
progress")
                }
        case core.SubTaskSetProgress:
                progressDetail.TotalRecords = p.Total
@@ -322,11 +298,10 @@ func UpdateProgressDetail(db *gorm.DB, log core.Logger, 
taskId uint64, progressD
 }
 
 func runSubtask(
-       log core.Logger,
-       db *gorm.DB,
+       basicRes core.BasicRes,
+       ctx core.SubTaskContext,
        parentID uint64,
        subtaskNumber int,
-       ctx core.SubTaskContext,
        entryPoint core.SubTaskEntryPoint,
 ) errors.Error {
        beginAt := time.Now()
@@ -340,14 +315,14 @@ func runSubtask(
                finishedAt := time.Now()
                subtask.FinishedAt = &finishedAt
                subtask.SpentSeconds = finishedAt.Unix() - beginAt.Unix()
-               recordSubtask(log, db, subtask)
+               recordSubtask(basicRes, subtask)
        }()
        return entryPoint(ctx)
 }
 
-func recordSubtask(log core.Logger, db *gorm.DB, subtask *models.Subtask) {
-       if err := db.Create(&subtask).Error; err != nil {
-               log.Error(err, "error writing subtask %d status to DB: %v", 
subtask.ID)
+func recordSubtask(basicRes core.BasicRes, subtask *models.Subtask) {
+       if err := basicRes.GetDal().Create(subtask); err != nil {
+               basicRes.GetLogger().Error(err, "error writing subtask %d 
status to DB: %v", subtask.ID)
        }
 }
 
diff --git a/scripts/compile-plugins.sh b/scripts/compile-plugins.sh
index b2ea0f06b..ea220924e 100755
--- a/scripts/compile-plugins.sh
+++ b/scripts/compile-plugins.sh
@@ -20,21 +20,19 @@
 #   make dev
 #
 # compile specific plugin and fire up api server:
-#   PLUGIN=<PLUGIN_NAME> make dev
-#   PLUGIN=<PLUGIN_NAME> PLUGIN2=<PLUGIN_NAME2> make dev
+#   PLUGIN=<PLUGIN_NAME[,PLUGIN_NAME2]> make dev
 #
 # compile all plugins and fire up api server in DEBUG MODE with `delve`:
 #   make debug
 #
 # compile specific plugin and fire up api server in DEBUG MODE with `delve`:
-#   PLUGIN=<PLUGIN_NAME> make dev
-#   PLUGIN=<PLUGIN_NAME> PLUGIN2=<PLUGIN_NAME> make dev
+#   PLUGIN=<PLUGIN_NAME[,PLUGIN_NAME2]> make debug
 
 set -e
 
 echo "Usage: "
 echo "  build all plugins:              $0 [golang build flags...]"
-echo "  build and keep one plugin only: PLUGIN=github,jira $0 [golang build 
flags...]"
+echo "  build and keep specified plugins only: PLUGIN=github,jira $0 [golang 
build flags...]"
 
 SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
 PLUGIN_SRC_DIR=$SCRIPT_DIR/../plugins
diff --git a/services/init.go b/services/init.go
index 27e8f91c5..204d548ee 100644
--- a/services/init.go
+++ b/services/init.go
@@ -23,7 +23,6 @@ import (
        "sync"
 
        "github.com/apache/incubator-devlake/errors"
-       "github.com/apache/incubator-devlake/impl"
 
        "github.com/apache/incubator-devlake/config"
        "github.com/apache/incubator-devlake/impl/dalgorm"
@@ -50,28 +49,22 @@ const failToCreateCronJob = "created cron job failed"
 // Init the services module
 func Init() {
        var err error
-       // basic resources initialization
        cfg = config.GetConfig()
        log = logger.Global
-       db, err = runner.NewGormDb(cfg, logger.Global.Nested("db"))
-       if err != nil {
-               panic(err)
-       }
-       // gorm doesn't support creating a PrepareStmt=false session from a 
PrepareStmt=true
-       // but the lockDatabase needs PrepareStmt=false for table locking, we 
have to deal with it here
-       lockingDb, err := runner.NewGormDbEx(cfg, 
logger.Global.Nested("migrator db"), &dal.SessionConfig{
-               PrepareStmt:            false,
-               SkipDefaultTransaction: true,
-       })
-       if err != nil {
-               panic(err)
-       }
-       err = lockDatabase(dalgorm.NewDalgorm(lockingDb))
+       db, err = runner.NewGormDb(cfg, log)
        if err != nil {
                panic(err)
        }
 
-       basicRes = impl.NewDefaultBasicRes(cfg, log, dalgorm.NewDalgorm(db))
+       // TODO: this is ugly, the lockDb / CreateAppBasicRes are coupled via 
global variables cfg/log
+       // it is too much for this refactor, let's solve it later
+
+       // lock the database to avoid multiple devlake instances from sharing 
the same one
+       lockDb()
+
+       // basic resources initialization
+       basicRes = runner.CreateBasicRes(cfg, log, db)
+
        // initialize db migrator
        migrator, err = runner.InitMigrator(basicRes)
        if err != nil {
@@ -82,12 +75,7 @@ func Init() {
 
        // now,
        // load plugins
-       err = runner.LoadPlugins(
-               cfg.GetString("PLUGIN_DIR"),
-               cfg,
-               logger.Global.Nested("plugin"),
-               db,
-       )
+       err = runner.LoadPlugins(basicRes)
        if err != nil {
                panic(err)
        }
@@ -129,3 +117,19 @@ func ExecuteMigration() errors.Error {
 func MigrationRequireConfirmation() bool {
        return migrator.HasPendingScripts()
 }
+
+func lockDb() {
+       // gorm doesn't support creating a PrepareStmt=false session from a 
PrepareStmt=true
+       // but the lockDatabase needs PrepareStmt=false for table locking, we 
have to deal with it here
+       lockingDb, err := runner.NewGormDbEx(cfg, 
logger.Global.Nested("migrator db"), &dal.SessionConfig{
+               PrepareStmt:            false,
+               SkipDefaultTransaction: true,
+       })
+       if err != nil {
+               panic(err)
+       }
+       err = lockDatabase(dalgorm.NewDalgorm(lockingDb))
+       if err != nil {
+               panic(err)
+       }
+}
diff --git a/services/pipeline_runner.go b/services/pipeline_runner.go
index 8eea52ebb..2d61bb970 100644
--- a/services/pipeline_runner.go
+++ b/services/pipeline_runner.go
@@ -39,9 +39,7 @@ type pipelineRunner struct {
 
 func (p *pipelineRunner) runPipelineStandalone() errors.Error {
        return runner.RunPipeline(
-               cfg,
-               p.logger,
-               db,
+               runner.CreateBasicRes(cfg, p.logger, db),
                p.pipeline.ID,
                func(taskIds []uint64) errors.Error {
                        return RunTasksStandalone(p.logger, taskIds)
diff --git a/services/task.go b/services/task.go
index f81db2cba..f7e1eaa68 100644
--- a/services/task.go
+++ b/services/task.go
@@ -322,9 +322,7 @@ func runTaskStandalone(parentLog core.Logger, taskId 
uint64) errors.Error {
        go updateTaskProgress(taskId, progress)
        err = runner.RunTask(
                ctx,
-               cfg,
-               parentLog,
-               db,
+               runner.CreateBasicRes(cfg, parentLog, db),
                progress,
                taskId,
        )
@@ -347,7 +345,7 @@ func updateTaskProgress(taskId uint64, progress chan 
core.RunningProgress) {
        progressDetail := data.ProgressDetail
        for p := range progress {
                runningTasks.mu.Lock()
-               runner.UpdateProgressDetail(db, log, taskId, progressDetail, &p)
+               runner.UpdateProgressDetail(basicRes, taskId, progressDetail, 
&p)
                runningTasks.mu.Unlock()
        }
 }
diff --git a/worker/app/pipeline_workflow.go b/worker/app/pipeline_workflow.go
index d3317c113..2943beb3e 100644
--- a/worker/app/pipeline_workflow.go
+++ b/worker/app/pipeline_workflow.go
@@ -30,15 +30,14 @@ import (
 
 // DevLakePipelineWorkflow FIXME ...
 func DevLakePipelineWorkflow(ctx workflow.Context, configJson []byte, 
pipelineId uint64, loggerConfig *core.LoggerConfig) errors.Error {
-       cfg, log, db, err := loadResources(configJson, loggerConfig)
+       basicRes, err := loadResources(configJson, loggerConfig)
        if err != nil {
                return errors.Convert(err)
        }
+       log := basicRes.GetLogger()
        log.Info("received pipeline #%d", pipelineId)
        err = runner.RunPipeline(
-               cfg,
-               log,
-               db,
+               basicRes,
                pipelineId,
                func(taskIds []uint64) errors.Error {
                        return runTasks(ctx, configJson, taskIds, log)
diff --git a/worker/app/shared.go b/worker/app/shared.go
index a46090995..8fe17484c 100644
--- a/worker/app/shared.go
+++ b/worker/app/shared.go
@@ -19,33 +19,34 @@ package app
 
 import (
        "bytes"
+
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/logger"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/runner"
        "github.com/spf13/viper"
-       "gorm.io/gorm"
 )
 
-func loadResources(configJson []byte, loggerConfig *core.LoggerConfig) 
(*viper.Viper, core.Logger, *gorm.DB, errors.Error) {
+func loadResources(configJson []byte, loggerConfig *core.LoggerConfig) 
(core.BasicRes, errors.Error) {
+       // TODO: should be redirected to server
+       globalLogger := logger.Global.Nested("worker")
        // prepare
        cfg := viper.New()
        cfg.SetConfigType("json")
        err := cfg.ReadConfig(bytes.NewBuffer(configJson))
        if err != nil {
-               return nil, nil, nil, errors.Convert(err)
+               globalLogger.Error(err, "failed to load resources")
+               return nil, errors.Convert(err)
        }
-       // TODO: should be redirected to server
-       globalLogger := logger.Global.Nested("worker")
        db, err := runner.NewGormDb(cfg, globalLogger)
        if err != nil {
-               return nil, nil, nil, errors.Convert(err)
+               return nil, errors.Convert(err)
        }
        log, err := getWorkerLogger(globalLogger, loggerConfig)
        if err != nil {
-               return nil, nil, nil, errors.Convert(err)
+               return nil, errors.Convert(err)
        }
-       return cfg, log, db, errors.Convert(err)
+       return runner.CreateBasicRes(cfg, log, db), nil
 }
 
 func getWorkerLogger(log core.Logger, logConfig *core.LoggerConfig) 
(core.Logger, errors.Error) {
diff --git a/worker/app/task_activity.go b/worker/app/task_activity.go
index b3781be42..c5cfc3734 100644
--- a/worker/app/task_activity.go
+++ b/worker/app/task_activity.go
@@ -19,6 +19,7 @@ package app
 
 import (
        "context"
+
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/models"
        "github.com/apache/incubator-devlake/plugins/core"
@@ -28,21 +29,22 @@ import (
 
 // DevLakeTaskActivity FIXME ...
 func DevLakeTaskActivity(ctx context.Context, configJson []byte, taskId 
uint64, loggerConfig *core.LoggerConfig) errors.Error {
-       cfg, log, db, err := loadResources(configJson, loggerConfig)
+       basicRes, err := loadResources(configJson, loggerConfig)
        if err != nil {
                return err
        }
+       log := basicRes.GetLogger()
        log.Info("received task #%d", taskId)
        progressDetail := &models.TaskProgressDetail{}
        progChan := make(chan core.RunningProgress)
        defer close(progChan)
        go func() {
                for p := range progChan {
-                       runner.UpdateProgressDetail(db, log, taskId, 
progressDetail, &p)
+                       runner.UpdateProgressDetail(basicRes, taskId, 
progressDetail, &p)
                        activity.RecordHeartbeat(ctx, progressDetail)
                }
        }()
-       err = runner.RunTask(ctx, cfg, log, db, progChan, taskId)
+       err = runner.RunTask(ctx, basicRes, progChan, taskId)
        if err != nil {
                log.Error(err, "failed to execute task #%d", taskId)
        }
diff --git a/worker/main.go b/worker/main.go
index d3bd6a754..b98911b94 100644
--- a/worker/main.go
+++ b/worker/main.go
@@ -18,10 +18,10 @@ limitations under the License.
 package main
 
 import (
-       "github.com/apache/incubator-devlake/errors"
        "log"
 
-       "github.com/apache/incubator-devlake/config"
+       "github.com/apache/incubator-devlake/errors"
+
        "github.com/apache/incubator-devlake/logger"
        "github.com/apache/incubator-devlake/runner"
        _ "github.com/apache/incubator-devlake/version"
@@ -31,22 +31,17 @@ import (
 )
 
 func main() {
-       // basic resources
-       cfg := config.GetConfig()
-       db, err := runner.NewGormDb(cfg, logger.Global)
-       if err != nil {
-               panic(err)
-       }
-       err = runner.LoadPlugins(cfg.GetString("PLUGIN_DIR"), cfg, 
logger.Global, db)
+       basicRes := runner.CreateAppBasicRes()
+       err := runner.LoadPlugins(basicRes)
        if err != nil {
                panic(err)
        }
 
        // establish temporal connection
-       TASK_QUEUE := cfg.GetString("TEMPORAL_TASK_QUEUE")
+       TASK_QUEUE := basicRes.GetConfig("TEMPORAL_TASK_QUEUE")
        // Create the client object just once per process
        c, err := errors.Convert01(client.NewClient(client.Options{
-               HostPort: cfg.GetString("TEMPORAL_URL"),
+               HostPort: basicRes.GetConfig("TEMPORAL_URL"),
                Logger:   app.NewTemporalLogger(logger.Global),
        }))
        if err != nil {


Reply via email to