This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 60c9921c2 refactor: use basicRes everywhere except the `service`
module (#3943)
60c9921c2 is described below
commit 60c9921c2c4cb832782a7a2d590326b19f297ac6
Author: Klesh Wong <[email protected]>
AuthorDate: Fri Dec 16 09:58:13 2022 +0800
refactor: use basicRes everywhere except the `service` module (#3943)
* docs: update comment to reflect latest status
* refactor: use basicRes everywhere except service
* fix: linting and tests
* fix: task logging files were miss placed
* fix: tasks should be ordered in run_pipeline
---
.gitignore | 3 +-
Makefile | 7 +-
helpers/e2ehelper/data_flow_tester.go | 2 +-
helpers/e2ehelper/data_flow_tester_test.go | 21 +---
impl/default_basic_res.go | 18 +++
models/task.go | 14 +++
plugins/ae/api/init.go | 6 +-
plugins/ae/impl/impl.go | 6 +-
plugins/azure/api/init.go | 6 +-
plugins/azure/impl/impl.go | 6 +-
plugins/bitbucket/api/init.go | 6 +-
plugins/bitbucket/impl/impl.go | 6 +-
plugins/core/basic_res.go | 2 +
plugins/core/dal/dal.go | 4 +
plugins/core/plugin_init.go | 4 +-
plugins/customize/impl/impl.go | 6 +-
plugins/dora/api/data.go | 64 ----------
plugins/dora/impl/impl.go | 8 +-
plugins/feishu/api/init.go | 6 +-
plugins/feishu/impl/impl.go | 35 +-----
plugins/gitee/api/init.go | 6 +-
plugins/gitee/impl/impl.go | 6 +-
plugins/gitextractor/main.go | 29 +++--
plugins/gitextractor/parser/repo.go | 1 +
plugins/github/api/init.go | 6 +-
plugins/github/impl/impl.go | 10 +-
plugins/github_graphql/impl/impl.go | 10 +-
plugins/gitlab/api/blueprint.go | 2 +-
plugins/gitlab/api/blueprint_V200_test.go | 4 +-
plugins/gitlab/api/blueprint_v200.go | 4 +-
plugins/gitlab/api/connection.go | 2 +-
plugins/gitlab/api/init.go | 10 +-
plugins/gitlab/api/proxy.go | 2 +-
plugins/gitlab/api/scope.go | 14 +--
plugins/gitlab/api/transformation_rule.go | 10 +-
plugins/gitlab/impl/impl.go | 7 +-
plugins/helper/default_task_context.go | 93 +++-----------
plugins/icla/impl/impl.go | 4 +-
plugins/jenkins/api/init.go | 6 +-
plugins/jenkins/impl/impl.go | 9 +-
plugins/jira/api/init.go | 6 +-
plugins/jira/impl/impl.go | 9 +-
plugins/org/api/handlers.go | 8 +-
plugins/org/impl/impl.go | 8 +-
plugins/pagerduty/api/init.go | 6 +-
plugins/pagerduty/impl/impl.go | 9 +-
plugins/refdiff/impl/impl.go | 7 --
plugins/tapd/api/init.go | 10 +-
plugins/tapd/impl/impl.go | 6 +-
plugins/webhook/api/init.go | 6 +-
plugins/webhook/impl/impl.go | 6 +-
plugins/zentao/api/init.go | 6 +-
plugins/zentao/impl/impl.go | 7 +-
plugins/azure/api/init.go => runner/basic_res.go | 32 +++--
runner/directrun.go | 40 +++---
runner/loader.go | 9 +-
runner/run_pipeline.go | 45 +++----
runner/run_task.go | 149 ++++++++++-------------
scripts/compile-plugins.sh | 8 +-
services/init.go | 50 ++++----
services/pipeline_runner.go | 4 +-
services/task.go | 6 +-
worker/app/pipeline_workflow.go | 7 +-
worker/app/shared.go | 17 +--
worker/app/task_activity.go | 8 +-
worker/main.go | 17 +--
66 files changed, 363 insertions(+), 588 deletions(-)
diff --git a/.gitignore b/.gitignore
index dda6d7670..452d5aa1a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -110,6 +110,7 @@ dist
# Stores VSCode versions used for testing VSCode extensions
.vscode-test
+_debug_bin
# yarn v2
.yarn/cache
@@ -157,4 +158,4 @@ mocks/
api/docs/swagger.json
api/docs/swagger.yaml
api/docs/docs.go
-*.result/
+*.result/
\ No newline at end of file
diff --git a/Makefile b/Makefile
index e818a7349..9df8fb0e0 100644
--- a/Makefile
+++ b/Makefile
@@ -22,12 +22,17 @@ TAG ?= $(shell git tag --points-at HEAD)
IMAGE_REPO ?= "apache"
VERSION = $(TAG)@$(SHA)
-dep:
+go-dep:
go install github.com/vektra/mockery/v2@latest
go install github.com/swaggo/swag/cmd/[email protected]
go install github.com/atombender/go-jsonschema/cmd/gojsonschema@latest
+ go install github.com/golangci/golangci-lint/cmd/[email protected]
+
+python-dep:
pip install -r requirements.txt
+dep: go-dep python-dep
+
swag:
swag init --parseDependency --parseInternal -o ./api/docs -g
./api/api.go -g plugins/*/api/*.go
@echo "visit the swagger document on
http://localhost:8080/swagger/index.html"
diff --git a/helpers/e2ehelper/data_flow_tester.go
b/helpers/e2ehelper/data_flow_tester.go
index f2c32d536..eb8c07392 100644
--- a/helpers/e2ehelper/data_flow_tester.go
+++ b/helpers/e2ehelper/data_flow_tester.go
@@ -191,7 +191,7 @@ func (t *DataFlowTester) Subtask(subtaskMeta
core.SubTaskMeta, taskData interfac
// SubtaskContext creates a subtask context
func (t *DataFlowTester) SubtaskContext(taskData interface{})
core.SubTaskContext {
- return helper.NewStandaloneSubTaskContext(context.Background(), t.Cfg,
t.Log, t.Db, t.Name, taskData)
+ return helper.NewStandaloneSubTaskContext(context.Background(),
runner.CreateBasicRes(t.Cfg, t.Log, t.Db), t.Name, taskData)
}
func filterColumn(column dal.ColumnMeta, opts TableOptions) bool {
diff --git a/helpers/e2ehelper/data_flow_tester_test.go
b/helpers/e2ehelper/data_flow_tester_test.go
index c13d99459..77bd1649d 100644
--- a/helpers/e2ehelper/data_flow_tester_test.go
+++ b/helpers/e2ehelper/data_flow_tester_test.go
@@ -53,29 +53,16 @@ func ExampleDataFlowTester() {
}
// import raw data table
-
dataflowTester.ImportCsvIntoRawTable("./tables/_raw_gitlab_api_projects.csv",
"_raw_gitlab_api_project")
+
dataflowTester.ImportCsvIntoRawTable("./tables/_raw_gitlab_api_issues.csv",
"_raw_gitlab_api_issues")
// verify extraction
dataflowTester.FlushTabler(gitlabModels.GitlabProject{})
- dataflowTester.Subtask(tasks.ExtractProjectMeta, taskData)
+ dataflowTester.Subtask(tasks.ExtractApiIssuesMeta, taskData)
dataflowTester.VerifyTable(
- gitlabModels.GitlabProject{},
- "tables/_tool_gitlab_projects.csv",
+ gitlabModels.GitlabIssue{},
+ "tables/_tool_gitlab_issues.csv",
[]string{
"gitlab_id",
- "name",
- "description",
- "default_branch",
- "path_with_namespace",
- "web_url",
- "creator_id",
- "visibility",
- "open_issues_count",
- "star_count",
- "forked_from_project_id",
- "forked_from_project_web_url",
- "created_date",
- "updated_date",
"_raw_data_params",
"_raw_data_table",
"_raw_data_id",
diff --git a/impl/default_basic_res.go b/impl/default_basic_res.go
index 8b5233337..5fc782990 100644
--- a/impl/default_basic_res.go
+++ b/impl/default_basic_res.go
@@ -45,6 +45,24 @@ func (c *DefaultBasicRes) GetLogger() core.Logger {
return c.logger
}
+// NestedLogger returns a new DefaultBasicRes with a new nested logger
+func (c *DefaultBasicRes) NestedLogger(name string) core.BasicRes {
+ return &DefaultBasicRes{
+ cfg: c.cfg,
+ logger: c.logger.Nested(name),
+ db: c.db,
+ }
+}
+
+// ReplaceLogger returns a new DefaultBasicRes with the specified logger
+func (c *DefaultBasicRes) ReplaceLogger(logger core.Logger) core.BasicRes {
+ return &DefaultBasicRes{
+ cfg: c.cfg,
+ logger: logger,
+ db: c.db,
+ }
+}
+
// NewDefaultBasicRes creates a new DefaultBasicRes instance
func NewDefaultBasicRes(
cfg *viper.Viper,
diff --git a/models/task.go b/models/task.go
index 0dd571a04..bfc9d8ab7 100644
--- a/models/task.go
+++ b/models/task.go
@@ -18,8 +18,10 @@ limitations under the License.
package models
import (
+ "encoding/json"
"time"
+ "github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/models/common"
"github.com/apache/incubator-devlake/plugins/core"
"gorm.io/datatypes"
@@ -89,3 +91,15 @@ func (Task) TableName() string {
func (Subtask) TableName() string {
return "_devlake_subtasks"
}
+
+func (task *Task) GetSubTasks() ([]string, errors.Error) {
+ var subtasks []string
+ err := errors.Convert(json.Unmarshal(task.Subtasks, &subtasks))
+ return subtasks, err
+}
+
+func (task *Task) GetOptions() (map[string]interface{}, errors.Error) {
+ var options map[string]interface{}
+ err := errors.Convert(json.Unmarshal(task.Options, &options))
+ return options, err
+}
diff --git a/plugins/ae/api/init.go b/plugins/ae/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/ae/api/init.go
+++ b/plugins/ae/api/init.go
@@ -21,16 +21,14 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/go-playground/validator/v10"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var vld *validator.Validate
var connectionHelper *helper.ConnectionApiHelper
var basicRes core.BasicRes
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+ basicRes = br
vld = validator.New()
connectionHelper = helper.NewConnectionHelper(
basicRes,
diff --git a/plugins/ae/impl/impl.go b/plugins/ae/impl/impl.go
index 3532720e3..8978ff8e1 100644
--- a/plugins/ae/impl/impl.go
+++ b/plugins/ae/impl/impl.go
@@ -27,8 +27,6 @@ import (
"github.com/apache/incubator-devlake/plugins/ae/tasks"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var _ core.PluginMeta = (*AE)(nil)
@@ -41,8 +39,8 @@ var _ core.CloseablePluginTask = (*AE)(nil)
type AE struct{}
-func (plugin AE) Init(config *viper.Viper, logger core.Logger, db *gorm.DB)
errors.Error {
- api.Init(config, logger, db)
+func (plugin AE) Init(basicRes core.BasicRes) errors.Error {
+ api.Init(basicRes)
return nil
}
diff --git a/plugins/azure/api/init.go b/plugins/azure/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/azure/api/init.go
+++ b/plugins/azure/api/init.go
@@ -21,16 +21,14 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/go-playground/validator/v10"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var vld *validator.Validate
var connectionHelper *helper.ConnectionApiHelper
var basicRes core.BasicRes
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+ basicRes = br
vld = validator.New()
connectionHelper = helper.NewConnectionHelper(
basicRes,
diff --git a/plugins/azure/impl/impl.go b/plugins/azure/impl/impl.go
index 29328c3a6..a7bc49de5 100644
--- a/plugins/azure/impl/impl.go
+++ b/plugins/azure/impl/impl.go
@@ -27,8 +27,6 @@ import (
"github.com/apache/incubator-devlake/plugins/azure/tasks"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
// make sure interface is implemented
@@ -49,8 +47,8 @@ func (plugin Azure) Description() string {
return "collect some Azure data"
}
-func (plugin Azure) Init(config *viper.Viper, logger core.Logger, db *gorm.DB)
errors.Error {
- api.Init(config, logger, db)
+func (plugin Azure) Init(basicRes core.BasicRes) errors.Error {
+ api.Init(basicRes)
return nil
}
diff --git a/plugins/bitbucket/api/init.go b/plugins/bitbucket/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/bitbucket/api/init.go
+++ b/plugins/bitbucket/api/init.go
@@ -21,16 +21,14 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/go-playground/validator/v10"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var vld *validator.Validate
var connectionHelper *helper.ConnectionApiHelper
var basicRes core.BasicRes
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+ basicRes = br
vld = validator.New()
connectionHelper = helper.NewConnectionHelper(
basicRes,
diff --git a/plugins/bitbucket/impl/impl.go b/plugins/bitbucket/impl/impl.go
index 37e53edc9..14279749e 100644
--- a/plugins/bitbucket/impl/impl.go
+++ b/plugins/bitbucket/impl/impl.go
@@ -27,8 +27,6 @@ import (
"github.com/apache/incubator-devlake/plugins/bitbucket/tasks"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var _ core.PluginMeta = (*Bitbucket)(nil)
@@ -42,8 +40,8 @@ var _ core.CloseablePluginTask = (*Bitbucket)(nil)
type Bitbucket string
-func (plugin Bitbucket) Init(config *viper.Viper, logger core.Logger, db
*gorm.DB) errors.Error {
- api.Init(config, logger, db)
+func (plugin Bitbucket) Init(basicRes core.BasicRes) errors.Error {
+ api.Init(basicRes)
return nil
}
diff --git a/plugins/core/basic_res.go b/plugins/core/basic_res.go
index f9ff60856..9460296dc 100644
--- a/plugins/core/basic_res.go
+++ b/plugins/core/basic_res.go
@@ -26,4 +26,6 @@ type BasicRes interface {
GetConfig(name string) string
GetLogger() Logger
GetDal() dal.Dal
+ NestedLogger(name string) BasicRes
+ ReplaceLogger(logger Logger) BasicRes
}
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 6b1d80214..46077b753 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -293,3 +293,7 @@ const LockClause string = "Lock"
func Lock(write bool, nowait bool) Clause {
return Clause{Type: LockClause, Data: []bool{write, nowait}}
}
+
+func Expr(expr string, params ...interface{}) DalClause {
+ return DalClause{Expr: expr, Params: params}
+}
diff --git a/plugins/core/plugin_init.go b/plugins/core/plugin_init.go
index 974cc76f0..749cd5be2 100644
--- a/plugins/core/plugin_init.go
+++ b/plugins/core/plugin_init.go
@@ -19,11 +19,9 @@ package core
import (
"github.com/apache/incubator-devlake/errors"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
// PluginInit Implement this interface if plugin needed some initialization
type PluginInit interface {
- Init(config *viper.Viper, logger Logger, db *gorm.DB) errors.Error
+ Init(basicRes BasicRes) errors.Error
}
diff --git a/plugins/customize/impl/impl.go b/plugins/customize/impl/impl.go
index 9f196714a..6be99337a 100644
--- a/plugins/customize/impl/impl.go
+++ b/plugins/customize/impl/impl.go
@@ -23,10 +23,7 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/customize/api"
"github.com/apache/incubator-devlake/plugins/customize/tasks"
- "github.com/apache/incubator-devlake/plugins/helper"
"github.com/mitchellh/mapstructure"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var _ core.PluginMeta = (*Customize)(nil)
@@ -38,8 +35,7 @@ type Customize struct {
handlers *api.Handlers
}
-func (plugin *Customize) Init(config *viper.Viper, logger core.Logger, db
*gorm.DB) errors.Error {
- basicRes := helper.NewDefaultBasicRes(config, logger, db)
+func (plugin *Customize) Init(basicRes core.BasicRes) errors.Error {
plugin.handlers = api.NewHandlers(basicRes.GetDal())
return nil
}
diff --git a/plugins/dora/api/data.go b/plugins/dora/api/data.go
deleted file mode 100644
index 6a0ff30f4..000000000
--- a/plugins/dora/api/data.go
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package api
-
-import (
- "github.com/apache/incubator-devlake/errors"
- "github.com/apache/incubator-devlake/plugins/core"
- "net/http"
-)
-
-const RAW_DEPLOYMENTS_TABLE = `dora_deplyments`
-
-//TODO Please modify the folowing code to adapt to your plugin
-/*
-POST /plugins/dora/deployments
-{
- TODO
-}
-*/
-func PostDeployments(input *core.ApiResourceInput) (*core.ApiResourceOutput,
errors.Error) {
- // TODO
- return &core.ApiResourceOutput{Body: nil, Status: http.StatusOK}, nil
-}
-
-const RAW_ISSUES_TABLE = `dora_issues`
-
-//TODO Please modify the folowing code to adapt to your plugin
-/*
-POST /plugins/dora/issues
-{
- TODO
-}
-*/
-func PostIssues(input *core.ApiResourceInput) (*core.ApiResourceOutput,
errors.Error) {
- // TODO
- return &core.ApiResourceOutput{Body: nil, Status: http.StatusOK}, nil
-}
-
-//TODO Please modify the folowing code to adapt to your plugin
-/*
-POST /plugins/dora/issues/:id/close
-{
- TODO
-}
-*/
-func CloseIssues(input *core.ApiResourceInput) (*core.ApiResourceOutput,
errors.Error) {
- // TODO
- return &core.ApiResourceOutput{Body: nil, Status: http.StatusOK}, nil
-}
diff --git a/plugins/dora/impl/impl.go b/plugins/dora/impl/impl.go
index ca5a8f203..a3815e06b 100644
--- a/plugins/dora/impl/impl.go
+++ b/plugins/dora/impl/impl.go
@@ -19,17 +19,15 @@ package impl
import (
"encoding/json"
+
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/dora/models/migrationscripts"
"github.com/apache/incubator-devlake/plugins/dora/tasks"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
// make sure interface is implemented
var _ core.PluginMeta = (*Dora)(nil)
-var _ core.PluginInit = (*Dora)(nil)
var _ core.PluginTask = (*Dora)(nil)
var _ core.PluginModel = (*Dora)(nil)
var _ core.PluginMetric = (*Dora)(nil)
@@ -53,10 +51,6 @@ func (plugin Dora) SvgIcon() string {
</svg>`
}
-func (plugin Dora) Init(config *viper.Viper, logger core.Logger, db *gorm.DB)
errors.Error {
- return nil
-}
-
func (plugin Dora) RequiredDataEntities() (data []map[string]interface{}, err
errors.Error) {
return []map[string]interface{}{
{
diff --git a/plugins/feishu/api/init.go b/plugins/feishu/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/feishu/api/init.go
+++ b/plugins/feishu/api/init.go
@@ -21,16 +21,14 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/go-playground/validator/v10"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var vld *validator.Validate
var connectionHelper *helper.ConnectionApiHelper
var basicRes core.BasicRes
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+ basicRes = br
vld = validator.New()
connectionHelper = helper.NewConnectionHelper(
basicRes,
diff --git a/plugins/feishu/impl/impl.go b/plugins/feishu/impl/impl.go
index 98d61102d..aa637f7c5 100644
--- a/plugins/feishu/impl/impl.go
+++ b/plugins/feishu/impl/impl.go
@@ -22,9 +22,6 @@ import (
"github.com/apache/incubator-devlake/errors"
- "github.com/spf13/viper"
- "gorm.io/gorm"
-
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/feishu/api"
"github.com/apache/incubator-devlake/plugins/feishu/models"
@@ -43,36 +40,8 @@ var _ core.CloseablePluginTask = (*Feishu)(nil)
type Feishu struct{}
-func (plugin Feishu) Init(config *viper.Viper, logger core.Logger, db
*gorm.DB) errors.Error {
- api.Init(config, logger, db)
-
- // FIXME after config-ui support feishu plugin
- // save env to db where name=feishu
- connection := &models.FeishuConnection{}
- if db.Migrator().HasTable(connection) {
- if err := db.Find(connection, map[string]string{"name":
"Feishu"}).Error; err != nil {
- return errors.Convert(err)
- }
- if connection.ID != 0 {
- encodeKey := config.GetString(core.EncodeKeyEnvStr)
- connection.Endpoint =
config.GetString(`FEISHU_ENDPOINT`)
- connection.AppId = config.GetString(`FEISHU_APPID`)
- connection.SecretKey =
config.GetString(`FEISHU_APPSCRECT`)
- if connection.Endpoint != `` && connection.AppId != ``
&& connection.SecretKey != `` && encodeKey != `` {
- err := helper.UpdateEncryptFields(connection,
func(plaintext string) (string, errors.Error) {
- return core.Encrypt(encodeKey,
plaintext)
- })
- if err != nil {
- return err
- }
- // update from .env and save to db
- err =
errors.Convert(db.Updates(connection).Error)
- if err != nil {
- return err
- }
- }
- }
- }
+func (plugin Feishu) Init(basicRes core.BasicRes) errors.Error {
+ api.Init(basicRes)
return nil
}
diff --git a/plugins/gitee/api/init.go b/plugins/gitee/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/gitee/api/init.go
+++ b/plugins/gitee/api/init.go
@@ -21,16 +21,14 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/go-playground/validator/v10"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var vld *validator.Validate
var connectionHelper *helper.ConnectionApiHelper
var basicRes core.BasicRes
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+ basicRes = br
vld = validator.New()
connectionHelper = helper.NewConnectionHelper(
basicRes,
diff --git a/plugins/gitee/impl/impl.go b/plugins/gitee/impl/impl.go
index 38d512788..bd9327cf4 100644
--- a/plugins/gitee/impl/impl.go
+++ b/plugins/gitee/impl/impl.go
@@ -28,8 +28,6 @@ import (
"github.com/apache/incubator-devlake/plugins/gitee/models/migrationscripts"
"github.com/apache/incubator-devlake/plugins/gitee/tasks"
"github.com/apache/incubator-devlake/plugins/helper"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var _ core.PluginMeta = (*Gitee)(nil)
@@ -42,8 +40,8 @@ var _ core.CloseablePluginTask = (*Gitee)(nil)
type Gitee string
-func (plugin Gitee) Init(config *viper.Viper, logger core.Logger, db *gorm.DB)
errors.Error {
- api.Init(config, logger, db)
+func (plugin Gitee) Init(basicRes core.BasicRes) errors.Error {
+ api.Init(basicRes)
return nil
}
diff --git a/plugins/gitextractor/main.go b/plugins/gitextractor/main.go
index dbef0dee3..b1fba1ddc 100644
--- a/plugins/gitextractor/main.go
+++ b/plugins/gitextractor/main.go
@@ -22,14 +22,15 @@ import (
"flag"
"github.com/apache/incubator-devlake/config"
+ rootImpl "github.com/apache/incubator-devlake/impl"
+ "github.com/apache/incubator-devlake/impl/dalgorm"
"github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/plugins/gitextractor/impl"
"github.com/apache/incubator-devlake/plugins/gitextractor/models"
"github.com/apache/incubator-devlake/plugins/gitextractor/store"
"github.com/apache/incubator-devlake/plugins/gitextractor/tasks"
"github.com/apache/incubator-devlake/plugins/helper"
- "gorm.io/driver/mysql"
- "gorm.io/gorm"
+ "github.com/apache/incubator-devlake/runner"
)
// PluginEntry is a variable exported for Framework to search and load
@@ -42,9 +43,12 @@ func main() {
user := flag.String("user", "", "-user")
password := flag.String("password", "", "-password")
output := flag.String("output", "", "-output")
- db := flag.String("db", "", "-db")
+ dbUrl := flag.String("db", "", "-db")
flag.Parse()
+
+ cfg := config.GetConfig()
log := logger.Global.Nested("git extractor")
+
var storage models.Store
var err error
if *url == "" {
@@ -58,23 +62,22 @@ func main() {
if err != nil {
panic(err)
}
- } else if *db != "" {
- database, err := gorm.Open(mysql.Open(*db))
- if err != nil {
- panic(err)
- }
- basicRes := helper.NewDefaultBasicRes(nil, log, database)
- storage = store.NewDatabase(basicRes, *url)
+ } else if *dbUrl != "" {
+ cfg.Set("DB_URL", *dbUrl)
} else {
panic("either specify `-output` or `-db` argument as
destination")
}
+ db, err := runner.NewGormDb(cfg, log)
+ if err != nil {
+ panic(err)
+ }
+ basicRes := rootImpl.NewDefaultBasicRes(cfg, log,
dalgorm.NewDalgorm(db))
+ storage = store.NewDatabase(basicRes, *url)
defer storage.Close()
ctx := context.Background()
subTaskCtx := helper.NewStandaloneSubTaskContext(
ctx,
- config.GetConfig(),
- log,
- nil,
+ basicRes,
"git extractor",
nil,
)
diff --git a/plugins/gitextractor/parser/repo.go
b/plugins/gitextractor/parser/repo.go
index 73361f411..937441184 100644
--- a/plugins/gitextractor/parser/repo.go
+++ b/plugins/gitextractor/parser/repo.go
@@ -207,6 +207,7 @@ func (r *GitRepo) CollectCommits(subtaskCtx
core.SubTaskContext) errors.Error {
if err != nil {
return err
}
+ // TODO: this defeat the csv mode!
db := subtaskCtx.GetDal()
components := make([]code.Component, 0)
err = db.All(&components, dal.From(components), dal.Where("repo_id= ?",
r.id))
diff --git a/plugins/github/api/init.go b/plugins/github/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/github/api/init.go
+++ b/plugins/github/api/init.go
@@ -21,16 +21,14 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/go-playground/validator/v10"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var vld *validator.Validate
var connectionHelper *helper.ConnectionApiHelper
var basicRes core.BasicRes
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+ basicRes = br
vld = validator.New()
connectionHelper = helper.NewConnectionHelper(
basicRes,
diff --git a/plugins/github/impl/impl.go b/plugins/github/impl/impl.go
index b254990df..dcfbf3183 100644
--- a/plugins/github/impl/impl.go
+++ b/plugins/github/impl/impl.go
@@ -19,10 +19,12 @@ package impl
import (
"fmt"
- "github.com/apache/incubator-devlake/plugins/core/dal"
"strings"
"time"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
+ "gorm.io/gorm"
+
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/github/api"
@@ -30,8 +32,6 @@ import (
"github.com/apache/incubator-devlake/plugins/github/models/migrationscripts"
"github.com/apache/incubator-devlake/plugins/github/tasks"
"github.com/apache/incubator-devlake/plugins/helper"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var _ core.PluginMeta = (*Github)(nil)
@@ -57,8 +57,8 @@ func (plugin Github) TransformationRule() interface{} {
return &models.GithubTransformationRule{}
}
-func (plugin Github) Init(config *viper.Viper, logger core.Logger, db
*gorm.DB) errors.Error {
- api.Init(config, logger, db)
+func (plugin Github) Init(basicRes core.BasicRes) errors.Error {
+ api.Init(basicRes)
return nil
}
diff --git a/plugins/github_graphql/impl/impl.go
b/plugins/github_graphql/impl/impl.go
index 2d40445a1..bf36d3bef 100644
--- a/plugins/github_graphql/impl/impl.go
+++ b/plugins/github_graphql/impl/impl.go
@@ -20,11 +20,12 @@ package impl
import (
"context"
"fmt"
- githubImpl "github.com/apache/incubator-devlake/plugins/github/impl"
"reflect"
"strings"
"time"
+ githubImpl "github.com/apache/incubator-devlake/plugins/github/impl"
+
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/github/models"
@@ -32,14 +33,11 @@ import (
"github.com/apache/incubator-devlake/plugins/github_graphql/tasks"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/merico-dev/graphql"
- "github.com/spf13/viper"
"golang.org/x/oauth2"
- "gorm.io/gorm"
)
// make sure interface is implemented
var _ core.PluginMeta = (*GithubGraphql)(nil)
-var _ core.PluginInit = (*GithubGraphql)(nil)
var _ core.PluginTask = (*GithubGraphql)(nil)
var _ core.PluginApi = (*GithubGraphql)(nil)
var _ core.PluginModel = (*GithubGraphql)(nil)
@@ -64,10 +62,6 @@ func (plugin GithubGraphql) Description() string {
return "collect some GithubGraphql data"
}
-func (plugin GithubGraphql) Init(config *viper.Viper, logger core.Logger, db
*gorm.DB) errors.Error {
- return nil
-}
-
func (plugin GithubGraphql) GetTablesInfo() []core.Tabler {
return []core.Tabler{}
}
diff --git a/plugins/gitlab/api/blueprint.go b/plugins/gitlab/api/blueprint.go
index 178fc4fd4..636258202 100644
--- a/plugins/gitlab/api/blueprint.go
+++ b/plugins/gitlab/api/blueprint.go
@@ -54,7 +54,7 @@ func MakePipelinePlan(subtaskMetas []core.SubTaskMeta,
connectionId uint64, scop
},
10*time.Second,
connection.Proxy,
- BasicRes,
+ basicRes,
)
if err != nil {
return nil, err
diff --git a/plugins/gitlab/api/blueprint_V200_test.go
b/plugins/gitlab/api/blueprint_V200_test.go
index 1a8a5b5d8..8511cdadc 100644
--- a/plugins/gitlab/api/blueprint_V200_test.go
+++ b/plugins/gitlab/api/blueprint_V200_test.go
@@ -174,7 +174,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
assert.Equal(t, err, nil)
// Refresh Global Variables and set the sql mock
- BasicRes = unithelper.DummyBasicRes(func(mockDal *mocks.Dal) {
+ basicRes = unithelper.DummyBasicRes(func(mockDal *mocks.Dal) {
mockDal.On("First", mock.Anything, mock.Anything).Run(func(args
mock.Arguments) {
dst := args.Get(0).(*models.GitlabConnection)
*dst = *testGitlabConnection
@@ -191,7 +191,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
}).Return(nil).Once()
})
connectionHelper = helper.NewConnectionHelper(
- BasicRes,
+ basicRes,
validator.New(),
)
diff --git a/plugins/gitlab/api/blueprint_v200.go
b/plugins/gitlab/api/blueprint_v200.go
index 7019cd4d2..b3228fc6d 100644
--- a/plugins/gitlab/api/blueprint_v200.go
+++ b/plugins/gitlab/api/blueprint_v200.go
@@ -179,7 +179,7 @@ func makePipelinePlanV200(subtaskMetas []core.SubTaskMeta,
scopes []*core.Bluepr
// GetRepoByConnectionIdAndscopeId get tbe repo by the connectionId and the
scopeId
func GetRepoByConnectionIdAndscopeId(connectionId uint64, scopeId string)
(*models.GitlabProject, errors.Error) {
repo := &models.GitlabProject{}
- db := BasicRes.GetDal()
+ db := basicRes.GetDal()
err := db.First(repo, dal.Where("connection_id = ? AND gitlab_id = ?",
connectionId, scopeId))
if err != nil {
if db.IsErrorNotFound(err) {
@@ -196,7 +196,7 @@ func GetTransformationRuleByRepo(repo
*models.GitlabProject) (*models.GitlabTran
transformationRules := &models.GitlabTransformationRule{}
transformationRuleId := repo.TransformationRuleId
if transformationRuleId != 0 {
- db := BasicRes.GetDal()
+ db := basicRes.GetDal()
err := db.First(transformationRules, dal.Where("id = ?",
transformationRuleId))
if err != nil {
if db.IsErrorNotFound(err) {
diff --git a/plugins/gitlab/api/connection.go b/plugins/gitlab/api/connection.go
index 394868fb3..7cdfd66dc 100644
--- a/plugins/gitlab/api/connection.go
+++ b/plugins/gitlab/api/connection.go
@@ -53,7 +53,7 @@ func TestConnection(input *core.ApiResourceInput)
(*core.ApiResourceOutput, erro
},
3*time.Second,
connection.Proxy,
- BasicRes,
+ basicRes,
)
if err != nil {
return nil, errors.Convert(err)
diff --git a/plugins/gitlab/api/init.go b/plugins/gitlab/api/init.go
index f1fb68f9b..ff2aa65fd 100644
--- a/plugins/gitlab/api/init.go
+++ b/plugins/gitlab/api/init.go
@@ -21,19 +21,17 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/go-playground/validator/v10"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var vld *validator.Validate
var connectionHelper *helper.ConnectionApiHelper
-var BasicRes core.BasicRes
+var basicRes core.BasicRes
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- BasicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+ basicRes = br
vld = validator.New()
connectionHelper = helper.NewConnectionHelper(
- BasicRes,
+ basicRes,
vld,
)
}
diff --git a/plugins/gitlab/api/proxy.go b/plugins/gitlab/api/proxy.go
index 999cb9bfe..b9c95baef 100644
--- a/plugins/gitlab/api/proxy.go
+++ b/plugins/gitlab/api/proxy.go
@@ -48,7 +48,7 @@ func Proxy(input *core.ApiResourceInput)
(*core.ApiResourceOutput, errors.Error)
},
TimeOut,
connection.Proxy,
- BasicRes,
+ basicRes,
)
if err != nil {
return nil, err
diff --git a/plugins/gitlab/api/scope.go b/plugins/gitlab/api/scope.go
index 13f3b6149..db3358460 100644
--- a/plugins/gitlab/api/scope.go
+++ b/plugins/gitlab/api/scope.go
@@ -77,7 +77,7 @@ func PutScope(input *core.ApiResourceInput)
(*core.ApiResourceOutput, errors.Err
return nil, err
}
}
- err = BasicRes.GetDal().CreateOrUpdate(projects.Data)
+ err = basicRes.GetDal().CreateOrUpdate(projects.Data)
if err != nil {
return nil, errors.Default.Wrap(err, "error on saving
GitlabProject")
}
@@ -102,7 +102,7 @@ func UpdateScope(input *core.ApiResourceInput)
(*core.ApiResourceOutput, errors.
return nil, errors.BadInput.New("invalid connectionId or
projectId")
}
var project models.GitlabProject
- err := BasicRes.GetDal().First(&project, dal.Where("connection_id = ?
AND gitlab_id = ?", connectionId, projectId))
+ err := basicRes.GetDal().First(&project, dal.Where("connection_id = ?
AND gitlab_id = ?", connectionId, projectId))
if err != nil {
return nil, errors.Default.Wrap(err, "getting GitlabProject
error")
}
@@ -114,7 +114,7 @@ func UpdateScope(input *core.ApiResourceInput)
(*core.ApiResourceOutput, errors.
if err != nil {
return nil, err
}
- err = BasicRes.GetDal().Update(project)
+ err = basicRes.GetDal().Update(project)
if err != nil {
return nil, errors.Default.Wrap(err, "error on saving
GitlabProject")
}
@@ -137,7 +137,7 @@ func GetScopeList(input *core.ApiResourceInput)
(*core.ApiResourceOutput, errors
return nil, errors.BadInput.New("invalid path params")
}
limit, offset := helper.GetLimitOffset(input.Query, "pageSize", "page")
- err := BasicRes.GetDal().All(&projects, dal.Where("connection_id = ?",
connectionId), dal.Limit(limit), dal.Offset(offset))
+ err := basicRes.GetDal().All(&projects, dal.Where("connection_id = ?",
connectionId), dal.Limit(limit), dal.Offset(offset))
if err != nil {
return nil, err
}
@@ -149,7 +149,7 @@ func GetScopeList(input *core.ApiResourceInput)
(*core.ApiResourceOutput, errors
}
var rules []models.GitlabTransformationRule
if len(ruleIds) > 0 {
- err = BasicRes.GetDal().All(&rules, dal.Where("id IN (?)",
ruleIds))
+ err = basicRes.GetDal().All(&rules, dal.Where("id IN (?)",
ruleIds))
if err != nil {
return nil, err
}
@@ -183,7 +183,7 @@ func GetScope(input *core.ApiResourceInput)
(*core.ApiResourceOutput, errors.Err
if connectionId*projectId == 0 {
return nil, errors.BadInput.New("invalid path params")
}
- err := BasicRes.GetDal().First(&project, dal.Where("connection_id = ?
AND gitlab_id = ?", connectionId, projectId))
+ err := basicRes.GetDal().First(&project, dal.Where("connection_id = ?
AND gitlab_id = ?", connectionId, projectId))
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, errors.NotFound.New("record not found")
}
@@ -192,7 +192,7 @@ func GetScope(input *core.ApiResourceInput)
(*core.ApiResourceOutput, errors.Err
}
var rule models.GitlabTransformationRule
if project.TransformationRuleId > 0 {
- err = BasicRes.GetDal().First(&rule, dal.Where("id = ?",
project.TransformationRuleId))
+ err = basicRes.GetDal().First(&rule, dal.Where("id = ?",
project.TransformationRuleId))
if err != nil {
return nil, err
}
diff --git a/plugins/gitlab/api/transformation_rule.go
b/plugins/gitlab/api/transformation_rule.go
index 78dc08b2b..1c56b46d0 100644
--- a/plugins/gitlab/api/transformation_rule.go
+++ b/plugins/gitlab/api/transformation_rule.go
@@ -45,7 +45,7 @@ func CreateTransformationRule(input *core.ApiResourceInput)
(*core.ApiResourceOu
if err != nil {
return nil, errors.Default.Wrap(err, "error in decoding
transformation rule")
}
- err = BasicRes.GetDal().Create(&rule)
+ err = basicRes.GetDal().Create(&rule)
if err != nil {
return nil, errors.Default.Wrap(err, "error on saving
TransformationRule")
}
@@ -69,7 +69,7 @@ func UpdateTransformationRule(input *core.ApiResourceInput)
(*core.ApiResourceOu
return nil, errors.Default.Wrap(err, "the transformation rule
ID should be an integer")
}
var old models.GitlabTransformationRule
- err = BasicRes.GetDal().First(&old, dal.Where("id = ?",
transformationRuleId))
+ err = basicRes.GetDal().First(&old, dal.Where("id = ?",
transformationRuleId))
if err != nil {
return nil, errors.Default.Wrap(err, "error on saving
TransformationRule")
}
@@ -78,7 +78,7 @@ func UpdateTransformationRule(input *core.ApiResourceInput)
(*core.ApiResourceOu
return nil, errors.Default.Wrap(err, "error decoding map into
transformationRule")
}
old.ID = transformationRuleId
- err = BasicRes.GetDal().Update(&old, dal.Where("id = ?",
transformationRuleId))
+ err = basicRes.GetDal().Update(&old, dal.Where("id = ?",
transformationRuleId))
if err != nil {
return nil, errors.Default.Wrap(err, "error on saving
TransformationRule")
}
@@ -100,7 +100,7 @@ func GetTransformationRule(input *core.ApiResourceInput)
(*core.ApiResourceOutpu
return nil, errors.Default.Wrap(err, "the transformation rule
ID should be an integer")
}
var rule models.GitlabTransformationRule
- err = BasicRes.GetDal().First(&rule, dal.Where("id = ?",
transformationRuleId))
+ err = basicRes.GetDal().First(&rule, dal.Where("id = ?",
transformationRuleId))
if err != nil {
return nil, errors.Default.Wrap(err, "error on get
TransformationRule")
}
@@ -120,7 +120,7 @@ func GetTransformationRule(input *core.ApiResourceInput)
(*core.ApiResourceOutpu
func GetTransformationRuleList(input *core.ApiResourceInput)
(*core.ApiResourceOutput, errors.Error) {
var rules []models.GitlabTransformationRule
limit, offset := helper.GetLimitOffset(input.Query, "pageSize", "page")
- err := BasicRes.GetDal().All(&rules, dal.Limit(limit),
dal.Offset(offset))
+ err := basicRes.GetDal().All(&rules, dal.Limit(limit),
dal.Offset(offset))
if err != nil {
return nil, errors.Default.Wrap(err, "error on get
TransformationRule list")
}
diff --git a/plugins/gitlab/impl/impl.go b/plugins/gitlab/impl/impl.go
index fac31b7e4..49f5738d0 100644
--- a/plugins/gitlab/impl/impl.go
+++ b/plugins/gitlab/impl/impl.go
@@ -29,9 +29,6 @@ import (
"github.com/apache/incubator-devlake/plugins/gitlab/models/migrationscripts"
"github.com/apache/incubator-devlake/plugins/gitlab/tasks"
"github.com/apache/incubator-devlake/plugins/helper"
-
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var _ interface {
@@ -48,8 +45,8 @@ var _ interface {
type Gitlab string
-func (plugin Gitlab) Init(config *viper.Viper, logger core.Logger, db
*gorm.DB) errors.Error {
- api.Init(config, logger, db)
+func (plugin Gitlab) Init(basicRes core.BasicRes) errors.Error {
+ api.Init(basicRes)
return nil
}
diff --git a/plugins/helper/default_task_context.go
b/plugins/helper/default_task_context.go
index f9aea6684..b65701d8c 100644
--- a/plugins/helper/default_task_context.go
+++ b/plugins/helper/default_task_context.go
@@ -20,67 +20,20 @@ package helper
import (
"context"
"fmt"
- "github.com/apache/incubator-devlake/errors"
"sync"
"sync/atomic"
"time"
- "github.com/apache/incubator-devlake/impl/dalgorm"
+ "github.com/apache/incubator-devlake/errors"
+
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/plugins/core/dal"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
-// bridge to current implementation at this point
-// TODO: implement another TaskContext for distributed runner/worker
-
-// DefaultBasicRes offers a defult BasicRes implementation
-// TODO: move to `impl` package
-type DefaultBasicRes struct {
- cfg *viper.Viper
- logger core.Logger
- db *gorm.DB
- dal dal.Dal
-}
-
-// GetConfig FIXME ...
-func (c *DefaultBasicRes) GetConfig(name string) string {
- return c.cfg.GetString(name)
-}
-
-// GetDb FIXME ...
-func (c *DefaultBasicRes) GetDb() *gorm.DB {
- return c.db
-}
-
-// GetDal FIXME ...
-func (c *DefaultBasicRes) GetDal() dal.Dal {
- return c.dal
-}
-
-// GetLogger FIXME ...
-func (c *DefaultBasicRes) GetLogger() core.Logger {
- return c.logger
-}
-
-// NewDefaultBasicRes returns a new DefaultBasicRes instance
-func NewDefaultBasicRes(
- cfg *viper.Viper,
- logger core.Logger,
- db *gorm.DB,
-) *DefaultBasicRes {
- return &DefaultBasicRes{
- cfg: cfg,
- logger: logger,
- db: db,
- dal: dalgorm.NewDalgorm(db),
- }
-}
+// TODO: move this file to `impl` module
// shared by TasContext and SubTaskContext
type defaultExecContext struct {
- *DefaultBasicRes
+ core.BasicRes
ctx context.Context
name string
data interface{}
@@ -92,19 +45,13 @@ type defaultExecContext struct {
func newDefaultExecContext(
ctx context.Context,
- cfg *viper.Viper,
- logger core.Logger,
- db *gorm.DB,
+ basicRes core.BasicRes,
name string,
data interface{},
progress chan core.RunningProgress,
) *defaultExecContext {
return &defaultExecContext{
- DefaultBasicRes: NewDefaultBasicRes(
- cfg,
- logger,
- db,
- ),
+ BasicRes: basicRes,
ctx: ctx,
name: name,
data: data,
@@ -158,9 +105,7 @@ func (c *defaultExecContext) IncProgress(progressType
core.ProgressType, quantit
func (c *defaultExecContext) fork(name string) *defaultExecContext {
return newDefaultExecContext(
c.ctx,
- c.cfg,
- c.logger.Nested(name),
- c.db,
+ c.BasicRes.NestedLogger(name),
name,
c.data,
c.progress,
@@ -177,13 +122,13 @@ type DefaultTaskContext struct {
// SetProgress FIXME ...
func (c *DefaultTaskContext) SetProgress(current int, total int) {
c.defaultExecContext.SetProgress(core.TaskSetProgress, current, total)
- c.logger.Info("total step: %d", c.total)
+ c.BasicRes.GetLogger().Info("total step: %d", c.total)
}
// IncProgress FIXME ...
func (c *DefaultTaskContext) IncProgress(quantity int) {
c.defaultExecContext.IncProgress(core.TaskIncProgress, quantity)
- c.logger.Info("finished step: %d / %d", c.current, c.total)
+ c.BasicRes.GetLogger().Info("finished step: %d / %d", c.current,
c.total)
}
// DefaultSubTaskContext is default implementation
@@ -197,7 +142,7 @@ type DefaultSubTaskContext struct {
func (c *DefaultSubTaskContext) SetProgress(current int, total int) {
c.defaultExecContext.SetProgress(core.SubTaskSetProgress, current,
total)
if total > -1 {
- c.logger.Info("total jobs: %d", c.total)
+ c.BasicRes.GetLogger().Info("total jobs: %d", c.total)
}
}
@@ -206,24 +151,22 @@ func (c *DefaultSubTaskContext) IncProgress(quantity int)
{
c.defaultExecContext.IncProgress(core.SubTaskIncProgress, quantity)
if c.LastProgressTime.IsZero() ||
c.LastProgressTime.Add(3*time.Second).Before(time.Now()) || c.current%1000 == 0
{
c.LastProgressTime = time.Now()
- c.logger.Info("finished records: %d", c.current)
+ c.BasicRes.GetLogger().Info("finished records: %d", c.current)
} else {
- c.logger.Debug("finished records: %d", c.current)
+ c.BasicRes.GetLogger().Debug("finished records: %d", c.current)
}
}
-// NewDefaultTaskContext FIXME ...
+// NewDefaultTaskContext holds everything needed by the task execution.
func NewDefaultTaskContext(
ctx context.Context,
- cfg *viper.Viper,
- logger core.Logger,
- db *gorm.DB,
+ basicRes core.BasicRes,
name string,
subtasks map[string]bool,
progress chan core.RunningProgress,
) core.TaskContext {
return &DefaultTaskContext{
- newDefaultExecContext(ctx, cfg, logger, db, name, nil,
progress),
+ newDefaultExecContext(ctx, basicRes, name, nil, progress),
subtasks,
make(map[string]*DefaultSubTaskContext),
}
@@ -259,14 +202,12 @@ func (c *DefaultTaskContext) SubTaskContext(subtask
string) (core.SubTaskContext
// going through the usual workflow.
func NewStandaloneSubTaskContext(
ctx context.Context,
- cfg *viper.Viper,
- logger core.Logger,
- db *gorm.DB,
+ basicRes core.BasicRes,
name string,
data interface{},
) core.SubTaskContext {
return &DefaultSubTaskContext{
- newDefaultExecContext(ctx, cfg, logger, db, name, data, nil),
+ newDefaultExecContext(ctx, basicRes, name, data, nil),
nil,
time.Time{},
}
diff --git a/plugins/icla/impl/impl.go b/plugins/icla/impl/impl.go
index 00900de20..d2e47aa5f 100644
--- a/plugins/icla/impl/impl.go
+++ b/plugins/icla/impl/impl.go
@@ -26,8 +26,6 @@ import (
"github.com/apache/incubator-devlake/plugins/icla/models"
"github.com/apache/incubator-devlake/plugins/icla/models/migrationscripts"
"github.com/apache/incubator-devlake/plugins/icla/tasks"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
// make sure interface is implemented
@@ -45,7 +43,7 @@ func (plugin Icla) Description() string {
return "collect some Icla data"
}
-func (plugin Icla) Init(config *viper.Viper, logger core.Logger, db *gorm.DB)
errors.Error {
+func (plugin Icla) Init(basicRes core.BasicRes) errors.Error {
return nil
}
diff --git a/plugins/jenkins/api/init.go b/plugins/jenkins/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/jenkins/api/init.go
+++ b/plugins/jenkins/api/init.go
@@ -21,16 +21,14 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/go-playground/validator/v10"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var vld *validator.Validate
var connectionHelper *helper.ConnectionApiHelper
var basicRes core.BasicRes
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+ basicRes = br
vld = validator.New()
connectionHelper = helper.NewConnectionHelper(
basicRes,
diff --git a/plugins/jenkins/impl/impl.go b/plugins/jenkins/impl/impl.go
index 49a28b884..278906673 100644
--- a/plugins/jenkins/impl/impl.go
+++ b/plugins/jenkins/impl/impl.go
@@ -19,10 +19,11 @@ package impl
import (
"fmt"
- "github.com/apache/incubator-devlake/plugins/core/dal"
"strings"
"time"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
+
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
@@ -30,8 +31,6 @@ import (
"github.com/apache/incubator-devlake/plugins/jenkins/models"
"github.com/apache/incubator-devlake/plugins/jenkins/models/migrationscripts"
"github.com/apache/incubator-devlake/plugins/jenkins/tasks"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var _ core.PluginMeta = (*Jenkins)(nil)
@@ -45,8 +44,8 @@ var _ core.PluginSource = (*Jenkins)(nil)
type Jenkins struct{}
-func (plugin Jenkins) Init(config *viper.Viper, logger core.Logger, db
*gorm.DB) errors.Error {
- api.Init(config, logger, db)
+func (plugin Jenkins) Init(basicRes core.BasicRes) errors.Error {
+ api.Init(basicRes)
return nil
}
diff --git a/plugins/jira/api/init.go b/plugins/jira/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/jira/api/init.go
+++ b/plugins/jira/api/init.go
@@ -21,16 +21,14 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/go-playground/validator/v10"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var vld *validator.Validate
var connectionHelper *helper.ConnectionApiHelper
var basicRes core.BasicRes
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+ basicRes = br
vld = validator.New()
connectionHelper = helper.NewConnectionHelper(
basicRes,
diff --git a/plugins/jira/impl/impl.go b/plugins/jira/impl/impl.go
index 67fe3f87e..7a9cdf013 100644
--- a/plugins/jira/impl/impl.go
+++ b/plugins/jira/impl/impl.go
@@ -18,7 +18,6 @@ limitations under the License.
package impl
import (
- goerror "errors"
"fmt"
"net/http"
"time"
@@ -31,8 +30,6 @@ import (
"github.com/apache/incubator-devlake/plugins/jira/models"
"github.com/apache/incubator-devlake/plugins/jira/models/migrationscripts"
"github.com/apache/incubator-devlake/plugins/jira/tasks"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var _ core.PluginMeta = (*Jira)(nil)
@@ -60,8 +57,8 @@ func (plugin Jira) TransformationRule() interface{} {
return &models.JiraTransformationRule{}
}
-func (plugin *Jira) Init(config *viper.Viper, logger core.Logger, db *gorm.DB)
errors.Error {
- api.Init(config, logger, db)
+func (plugin *Jira) Init(basicRes core.BasicRes) errors.Error {
+ api.Init(basicRes)
return nil
}
@@ -184,7 +181,7 @@ func (plugin Jira) PrepareTaskData(taskCtx
core.TaskContext, options map[string]
if op.TransformationRules == nil && op.TransformationRuleId != 0 {
var transformationRule models.JiraTransformationRule
err = taskCtx.GetDal().First(&transformationRule, dal.Where("id
= ?", op.TransformationRuleId))
- if err != nil && !goerror.Is(err, gorm.ErrRecordNotFound) {
+ if err != nil && db.IsErrorNotFound(err) {
return nil, errors.BadInput.Wrap(err, "fail to get
transformationRule")
}
op.TransformationRules, err =
tasks.MakeTransformationRules(transformationRule)
diff --git a/plugins/org/api/handlers.go b/plugins/org/api/handlers.go
index 95d7c4103..8596b6476 100644
--- a/plugins/org/api/handlers.go
+++ b/plugins/org/api/handlers.go
@@ -19,11 +19,11 @@ package api
import (
"encoding/csv"
- "github.com/apache/incubator-devlake/errors"
"net/http"
+ "github.com/apache/incubator-devlake/errors"
+
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/gocarina/gocsv"
)
@@ -33,8 +33,8 @@ type Handlers struct {
store store
}
-func NewHandlers(db dal.Dal, basicRes core.BasicRes) *Handlers {
- return &Handlers{store: NewDbStore(db, basicRes)}
+func NewHandlers(basicRes core.BasicRes) *Handlers {
+ return &Handlers{store: NewDbStore(basicRes.GetDal(), basicRes)}
}
func (h *Handlers) unmarshal(r *http.Request, items interface{}) errors.Error {
diff --git a/plugins/org/impl/impl.go b/plugins/org/impl/impl.go
index 76ed1ba9b..902eca8f1 100644
--- a/plugins/org/impl/impl.go
+++ b/plugins/org/impl/impl.go
@@ -19,13 +19,10 @@ package impl
import (
"github.com/apache/incubator-devlake/errors"
- "github.com/apache/incubator-devlake/impl/dalgorm"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/org/api"
"github.com/apache/incubator-devlake/plugins/org/tasks"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var _ core.PluginMeta = (*Org)(nil)
@@ -37,9 +34,8 @@ type Org struct {
handlers *api.Handlers
}
-func (plugin *Org) Init(config *viper.Viper, logger core.Logger, db *gorm.DB)
errors.Error {
- basicRes := helper.NewDefaultBasicRes(config, logger, db)
- plugin.handlers = api.NewHandlers(dalgorm.NewDalgorm(db), basicRes)
+func (plugin *Org) Init(basicRes core.BasicRes) errors.Error {
+ plugin.handlers = api.NewHandlers(basicRes)
return nil
}
diff --git a/plugins/pagerduty/api/init.go b/plugins/pagerduty/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/pagerduty/api/init.go
+++ b/plugins/pagerduty/api/init.go
@@ -21,16 +21,14 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/go-playground/validator/v10"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var vld *validator.Validate
var connectionHelper *helper.ConnectionApiHelper
var basicRes core.BasicRes
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+ basicRes = br
vld = validator.New()
connectionHelper = helper.NewConnectionHelper(
basicRes,
diff --git a/plugins/pagerduty/impl/impl.go b/plugins/pagerduty/impl/impl.go
index dcaabb3e8..1c43f519f 100644
--- a/plugins/pagerduty/impl/impl.go
+++ b/plugins/pagerduty/impl/impl.go
@@ -19,6 +19,8 @@ package impl
import (
"fmt"
+ "time"
+
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/helpers/pluginhelper/tap"
"github.com/apache/incubator-devlake/plugins/core"
@@ -27,9 +29,6 @@ import (
"github.com/apache/incubator-devlake/plugins/pagerduty/models"
"github.com/apache/incubator-devlake/plugins/pagerduty/models/migrationscripts"
"github.com/apache/incubator-devlake/plugins/pagerduty/tasks"
- "github.com/spf13/viper"
- "gorm.io/gorm"
- "time"
)
// make sure interface is implemented
@@ -46,8 +45,8 @@ func (plugin PagerDuty) Description() string {
return "collect some PagerDuty data"
}
-func (plugin PagerDuty) Init(config *viper.Viper, logger core.Logger, db
*gorm.DB) errors.Error {
- api.Init(config, logger, db)
+func (plugin PagerDuty) Init(basicRes core.BasicRes) errors.Error {
+ api.Init(basicRes)
return nil
}
diff --git a/plugins/refdiff/impl/impl.go b/plugins/refdiff/impl/impl.go
index 1007b2d38..c81bc137f 100644
--- a/plugins/refdiff/impl/impl.go
+++ b/plugins/refdiff/impl/impl.go
@@ -22,13 +22,10 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/refdiff/tasks"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
// make sure interface is implemented
var _ core.PluginMeta = (*RefDiff)(nil)
-var _ core.PluginInit = (*RefDiff)(nil)
var _ core.PluginTask = (*RefDiff)(nil)
var _ core.PluginApi = (*RefDiff)(nil)
var _ core.PluginModel = (*RefDiff)(nil)
@@ -63,10 +60,6 @@ func (plugin RefDiff) Settings() interface{} {
return nil
}
-func (plugin RefDiff) Init(config *viper.Viper, logger core.Logger, db
*gorm.DB) errors.Error {
- return nil
-}
-
func (plugin RefDiff) SubTaskMetas() []core.SubTaskMeta {
return []core.SubTaskMeta{
tasks.CalculateCommitsDiffMeta,
diff --git a/plugins/tapd/api/init.go b/plugins/tapd/api/init.go
index ef6c55f79..ff2aa65fd 100644
--- a/plugins/tapd/api/init.go
+++ b/plugins/tapd/api/init.go
@@ -21,20 +21,14 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/go-playground/validator/v10"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
-var db *gorm.DB
-var cfg *viper.Viper
var vld *validator.Validate
var connectionHelper *helper.ConnectionApiHelper
var basicRes core.BasicRes
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- db = database
- cfg = config
- basicRes = helper.NewDefaultBasicRes(cfg, logger, db)
+func Init(br core.BasicRes) {
+ basicRes = br
vld = validator.New()
connectionHelper = helper.NewConnectionHelper(
basicRes,
diff --git a/plugins/tapd/impl/impl.go b/plugins/tapd/impl/impl.go
index c72855203..a29cdb46d 100644
--- a/plugins/tapd/impl/impl.go
+++ b/plugins/tapd/impl/impl.go
@@ -29,8 +29,6 @@ import (
"github.com/apache/incubator-devlake/plugins/tapd/models"
"github.com/apache/incubator-devlake/plugins/tapd/models/migrationscripts"
"github.com/apache/incubator-devlake/plugins/tapd/tasks"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var _ core.PluginMeta = (*Tapd)(nil)
@@ -43,8 +41,8 @@ var _ core.CloseablePluginTask = (*Tapd)(nil)
type Tapd struct{}
-func (plugin Tapd) Init(config *viper.Viper, logger core.Logger, db *gorm.DB)
errors.Error {
- api.Init(config, logger, db)
+func (plugin Tapd) Init(basicRes core.BasicRes) errors.Error {
+ api.Init(basicRes)
return nil
}
diff --git a/plugins/webhook/api/init.go b/plugins/webhook/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/webhook/api/init.go
+++ b/plugins/webhook/api/init.go
@@ -21,16 +21,14 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/go-playground/validator/v10"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var vld *validator.Validate
var connectionHelper *helper.ConnectionApiHelper
var basicRes core.BasicRes
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+ basicRes = br
vld = validator.New()
connectionHelper = helper.NewConnectionHelper(
basicRes,
diff --git a/plugins/webhook/impl/impl.go b/plugins/webhook/impl/impl.go
index b3dfdcfc6..9cbd822a1 100644
--- a/plugins/webhook/impl/impl.go
+++ b/plugins/webhook/impl/impl.go
@@ -22,8 +22,6 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/webhook/api"
"github.com/apache/incubator-devlake/plugins/webhook/models/migrationscripts"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
// make sure interface is implemented
@@ -39,8 +37,8 @@ func (plugin Webhook) Description() string {
return "collect some Webhook data"
}
-func (plugin Webhook) Init(config *viper.Viper, logger core.Logger, db
*gorm.DB) errors.Error {
- api.Init(config, logger, db)
+func (plugin Webhook) Init(basicRes core.BasicRes) errors.Error {
+ api.Init(basicRes)
return nil
}
diff --git a/plugins/zentao/api/init.go b/plugins/zentao/api/init.go
index 6774e1482..ff2aa65fd 100644
--- a/plugins/zentao/api/init.go
+++ b/plugins/zentao/api/init.go
@@ -21,16 +21,14 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/go-playground/validator/v10"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
var vld *validator.Validate
var connectionHelper *helper.ConnectionApiHelper
var basicRes core.BasicRes
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- basicRes = helper.NewDefaultBasicRes(config, logger, database)
+func Init(br core.BasicRes) {
+ basicRes = br
vld = validator.New()
connectionHelper = helper.NewConnectionHelper(
basicRes,
diff --git a/plugins/zentao/impl/impl.go b/plugins/zentao/impl/impl.go
index 8b10c1e9e..1c6a0b625 100644
--- a/plugins/zentao/impl/impl.go
+++ b/plugins/zentao/impl/impl.go
@@ -19,6 +19,7 @@ package impl
import (
"fmt"
+
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
@@ -26,8 +27,6 @@ import (
"github.com/apache/incubator-devlake/plugins/zentao/models"
"github.com/apache/incubator-devlake/plugins/zentao/models/migrationscripts"
"github.com/apache/incubator-devlake/plugins/zentao/tasks"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
// make sure interface is implemented
@@ -44,8 +43,8 @@ func (plugin Zentao) Description() string {
return "collect some Zentao data"
}
-func (plugin Zentao) Init(config *viper.Viper, logger core.Logger, db
*gorm.DB) errors.Error {
- api.Init(config, logger, db)
+func (plugin Zentao) Init(basicRes core.BasicRes) errors.Error {
+ api.Init(basicRes)
return nil
}
diff --git a/plugins/azure/api/init.go b/runner/basic_res.go
similarity index 51%
copy from plugins/azure/api/init.go
copy to runner/basic_res.go
index 6774e1482..8bd740fe6 100644
--- a/plugins/azure/api/init.go
+++ b/runner/basic_res.go
@@ -15,25 +15,31 @@ See the License for the specific language governing
permissions and
limitations under the License.
*/
-package api
+package runner
import (
+ "github.com/apache/incubator-devlake/config"
+ "github.com/apache/incubator-devlake/impl"
+ "github.com/apache/incubator-devlake/impl/dalgorm"
+ "github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/plugins/helper"
- "github.com/go-playground/validator/v10"
"github.com/spf13/viper"
"gorm.io/gorm"
)
-var vld *validator.Validate
-var connectionHelper *helper.ConnectionApiHelper
-var basicRes core.BasicRes
+// CreateAppBasicRes returns a application level BasicRes instance based on
.env/environment variables
+// it is useful because multiple places need BasicRes including `main.go`
`directrun` and `worker`
+func CreateAppBasicRes() core.BasicRes {
+ cfg := config.GetConfig()
+ log := logger.Global
+ db, err := NewGormDb(cfg, logger.Global)
+ if err != nil {
+ panic(err)
+ }
+ return CreateBasicRes(cfg, log, db)
+}
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- basicRes = helper.NewDefaultBasicRes(config, logger, database)
- vld = validator.New()
- connectionHelper = helper.NewConnectionHelper(
- basicRes,
- vld,
- )
+// CreateBasicRes returns a BasicRes based on what was given
+func CreateBasicRes(cfg *viper.Viper, log core.Logger, db *gorm.DB)
core.BasicRes {
+ return impl.NewDefaultBasicRes(cfg, log, dalgorm.NewDalgorm(db))
}
diff --git a/runner/directrun.go b/runner/directrun.go
index 8b805743d..7794aa88c 100644
--- a/runner/directrun.go
+++ b/runner/directrun.go
@@ -19,6 +19,7 @@ package runner
import (
"context"
+ "encoding/json"
goerror "errors"
"fmt"
"io"
@@ -27,10 +28,7 @@ import (
"runtime"
"syscall"
- "github.com/apache/incubator-devlake/config"
- "github.com/apache/incubator-devlake/impl"
- "github.com/apache/incubator-devlake/impl/dalgorm"
- "github.com/apache/incubator-devlake/logger"
+ "github.com/apache/incubator-devlake/models"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/spf13/cobra"
)
@@ -50,18 +48,13 @@ func RunCmd(cmd *cobra.Command) {
// pluginTask: specific built-in plugin, for example: feishu, jira...
// options: plugin config
func DirectRun(cmd *cobra.Command, args []string, pluginTask core.PluginTask,
options map[string]interface{}) {
+ basicRes := CreateAppBasicRes()
tasks, err := cmd.Flags().GetStringSlice("subtasks")
if err != nil {
panic(err)
}
- cfg := config.GetConfig()
- log := logger.Global.Nested(cmd.Use)
- db, err := NewGormDb(cfg, log)
- if err != nil {
- panic(err)
- }
if pluginInit, ok := pluginTask.(core.PluginInit); ok {
- err = pluginInit.Init(cfg, log, db)
+ err = pluginInit.Init(basicRes)
if err != nil {
panic(err)
}
@@ -73,7 +66,7 @@ func DirectRun(cmd *cobra.Command, args []string, pluginTask
core.PluginTask, op
}
// collect migration and run
- migrator, err := InitMigrator(impl.NewDefaultBasicRes(cfg, log,
dalgorm.NewDalgorm(db)))
+ migrator, err := InitMigrator(basicRes)
if err != nil {
panic(err)
}
@@ -85,16 +78,23 @@ func DirectRun(cmd *cobra.Command, args []string,
pluginTask core.PluginTask, op
panic(err)
}
ctx := createContext()
- var parentTaskID uint64
+ optionsJson, err := json.Marshal(options)
+ if err != nil {
+ panic(err)
+ }
+ subtasksJson, err := json.Marshal(tasks)
+ if err != nil {
+ panic(err)
+ }
+ task := &models.Task{
+ Plugin: cmd.Use,
+ Options: optionsJson,
+ Subtasks: subtasksJson,
+ }
err = RunPluginSubTasks(
ctx,
- cfg,
- log,
- db,
- parentTaskID,
- cmd.Use,
- tasks,
- options,
+ basicRes,
+ task,
pluginTask,
nil,
)
diff --git a/runner/loader.go b/runner/loader.go
index ff50ff4c9..20c26f1e6 100644
--- a/runner/loader.go
+++ b/runner/loader.go
@@ -26,12 +26,11 @@ import (
"strings"
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
// LoadPlugins load plugins from local directory
-func LoadPlugins(pluginsDir string, config *viper.Viper, logger core.Logger,
db *gorm.DB) errors.Error {
+func LoadPlugins(basicRes core.BasicRes) errors.Error {
+ pluginsDir := basicRes.GetConfig("PLUGIN_DIR")
walkErr := filepath.WalkDir(pluginsDir, func(path string, d
fs.DirEntry, err error) error {
if err != nil {
return err
@@ -52,7 +51,7 @@ func LoadPlugins(pluginsDir string, config *viper.Viper,
logger core.Logger, db
return errors.Default.New(fmt.Sprintf("%s
PluginEntry must implement PluginMeta interface", pluginName))
}
if plugin, ok := symPluginEntry.(core.PluginInit); ok {
- err = plugin.Init(config, logger, db)
+ err = plugin.Init(basicRes)
if err != nil {
return err
}
@@ -62,7 +61,7 @@ func LoadPlugins(pluginsDir string, config *viper.Viper,
logger core.Logger, db
return nil
}
- logger.Info(`plugin loaded %s`, pluginName)
+ basicRes.GetLogger().Info(`plugin loaded %s`,
pluginName)
}
return nil
})
diff --git a/runner/run_pipeline.go b/runner/run_pipeline.go
index 32b85d69a..f7d6f491e 100644
--- a/runner/run_pipeline.go
+++ b/runner/run_pipeline.go
@@ -23,23 +23,25 @@ import (
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/models"
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/spf13/viper"
- "gorm.io/gorm"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
)
// RunPipeline FIXME ...
func RunPipeline(
- _ *viper.Viper,
- log core.Logger,
- db *gorm.DB,
+ basicRes core.BasicRes,
pipelineId uint64,
runTasks func([]uint64) errors.Error,
) errors.Error {
// load tasks for pipeline
+ db := basicRes.GetDal()
var tasks []models.Task
- err := db.Where("pipeline_id = ? AND status = ?", pipelineId,
models.TASK_CREATED).Order("pipeline_row, pipeline_col").Find(&tasks).Error
+ err := db.All(
+ &tasks,
+ dal.Where("pipeline_id = ? AND status = ?", pipelineId,
models.TASK_CREATED),
+ dal.Orderby("pipeline_row, pipeline_col"),
+ )
if err != nil {
- return errors.Convert(err)
+ return err
}
taskIds := make([][]uint64, 0)
for _, task := range tasks {
@@ -48,31 +50,32 @@ func RunPipeline(
}
taskIds[task.PipelineRow-1] =
append(taskIds[task.PipelineRow-1], task.ID)
}
- return runPipelineTasks(log, db, pipelineId, taskIds, runTasks)
+ return runPipelineTasks(basicRes, pipelineId, taskIds, runTasks)
}
func runPipelineTasks(
- log core.Logger,
- db *gorm.DB,
+ basicRes core.BasicRes,
pipelineId uint64,
taskIds [][]uint64,
runTasks func([]uint64) errors.Error,
) errors.Error {
+ db := basicRes.GetDal()
+ log := basicRes.GetLogger()
// load pipeline from db
dbPipeline := &models.DbPipeline{}
- err := db.Find(dbPipeline, pipelineId).Error
+ err := db.First(dbPipeline, dal.Where("id = ?", pipelineId))
if err != nil {
- return errors.Convert(err)
+ return err
}
// This double for loop executes each set of tasks sequentially while
// executing the set of tasks concurrently.
for i, row := range taskIds {
// update stage
- err = db.Model(dbPipeline).Updates(map[string]interface{}{
- "status": models.TASK_RUNNING,
- "stage": i + 1,
- }).Error
+ err = db.UpdateColumns(dbPipeline, []dal.DalSet{
+ {ColumnName: "status", Value: models.TASK_RUNNING},
+ {ColumnName: "stage", Value: i + 1},
+ })
if err != nil {
log.Error(err, "update pipeline state failed")
break
@@ -81,18 +84,16 @@ func runPipelineTasks(
err = runTasks(row)
if err != nil {
log.Error(err, "run tasks failed")
- return errors.Convert(err)
+ return err
}
// update finishedTasks
- err = db.Model(dbPipeline).Updates(map[string]interface{}{
- "finished_tasks": gorm.Expr("finished_tasks + ?",
len(row)),
- }).Error
+ err = db.UpdateColumn(dbPipeline, "finished_tasks",
dal.Expr("finished_tasks + ?", len(row)))
if err != nil {
log.Error(err, "update pipeline state failed")
- return errors.Convert(err)
+ return err
}
}
log.Info("pipeline finished in %d ms: %v",
time.Now().UnixMilli()-dbPipeline.BeganAt.UnixMilli(), err)
- return errors.Convert(err)
+ return err
}
diff --git a/runner/run_task.go b/runner/run_task.go
index 5a59d4f9f..09528a6cc 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -19,45 +19,42 @@ package runner
import (
"context"
- "encoding/json"
"fmt"
+ "time"
+
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/logger"
- "time"
- "github.com/apache/incubator-devlake/config"
"github.com/apache/incubator-devlake/models"
"github.com/apache/incubator-devlake/utils"
- "github.com/spf13/viper"
- "gorm.io/gorm"
"github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
)
// RunTask FIXME ...
func RunTask(
ctx context.Context,
- _ *viper.Viper,
- parentLogger core.Logger,
- db *gorm.DB,
+ basicRes core.BasicRes,
progress chan core.RunningProgress,
taskId uint64,
) (err errors.Error) {
+ db := basicRes.GetDal()
task := &models.Task{}
- if err := db.Find(task, taskId).Error; err != nil {
- return errors.Convert(err)
+ if err := db.First(task, dal.Where("id = ?", taskId)); err != nil {
+ return err
}
if task.Status == models.TASK_COMPLETED {
return errors.Default.New("invalid task status")
}
dbPipeline := &models.DbPipeline{}
- if err := db.Find(dbPipeline, task.PipelineId).Error; err != nil {
- return errors.Convert(err)
+ if err := db.First(dbPipeline, dal.Where("id = ? ", task.PipelineId));
err != nil {
+ return err
}
- log, err := getTaskLogger(parentLogger, task)
+ log, err := getTaskLogger(basicRes.GetLogger(), task)
if err != nil {
- return errors.Convert(err)
+ return err
}
beganAt := time.Now()
// make sure task status always correct even if it panicked
@@ -86,23 +83,23 @@ func RunTask(
} else {
lakeErr = errors.Convert(err)
}
- dbe := db.Model(task).Updates(map[string]interface{}{
- "status": models.TASK_FAILED,
- "message": lakeErr.Messages().Format(),
- "finished_at": finishedAt,
- "spent_seconds": spentSeconds,
- "failed_sub_task": subTaskName,
- }).Error
+ dbe := db.UpdateColumns(task, []dal.DalSet{
+ {ColumnName: "status", Value:
models.TASK_FAILED},
+ {ColumnName: "message", Value:
lakeErr.Messages().Format()},
+ {ColumnName: "finished_at", Value: finishedAt},
+ {ColumnName: "spent_seconds", Value:
spentSeconds},
+ {ColumnName: "failed_sub_task", Value:
subTaskName},
+ })
if dbe != nil {
log.Error(err, "failed to finalize task status
into db (task failed)")
}
} else {
- dbe := db.Model(task).Updates(map[string]interface{}{
- "status": models.TASK_COMPLETED,
- "message": "",
- "finished_at": finishedAt,
- "spent_seconds": spentSeconds,
- }).Error
+ dbe := db.UpdateColumns(task, []dal.DalSet{
+ {ColumnName: "status", Value:
models.TASK_COMPLETED},
+ {ColumnName: "message", Value: ""},
+ {ColumnName: "finished_at", Value: finishedAt},
+ {ColumnName: "spent_seconds", Value:
spentSeconds},
+ })
if dbe != nil {
log.Error(err, "failed to finalize task status
into db (task succeeded)")
}
@@ -111,34 +108,19 @@ func RunTask(
// start execution
log.Info("start executing task: %d", task.ID)
- if err := db.Model(task).Updates(map[string]interface{}{
- "status": models.TASK_RUNNING,
- "message": "",
- "began_at": beganAt,
- }).Error; err != nil {
- return errors.Convert(err)
- }
-
- var options map[string]interface{}
- err = errors.Convert(json.Unmarshal(task.Options, &options))
- if err != nil {
- return err
- }
- var subtasks []string
- err = errors.Convert(json.Unmarshal(task.Subtasks, &subtasks))
- if err != nil {
- return err
+ dbe := db.UpdateColumns(task, []dal.DalSet{
+ {ColumnName: "status", Value: models.TASK_RUNNING},
+ {ColumnName: "message", Value: ""},
+ {ColumnName: "began_at", Value: beganAt},
+ })
+ if dbe != nil {
+ return dbe
}
err = RunPluginTask(
ctx,
- config.GetConfig(),
- log.Nested(task.Plugin),
- db,
- task.ID,
- task.Plugin,
- subtasks,
- options,
+ basicRes.ReplaceLogger(log),
+ task,
progress,
)
if dbPipeline.SkipOnFail {
@@ -150,32 +132,22 @@ func RunTask(
// RunPluginTask FIXME ...
func RunPluginTask(
ctx context.Context,
- cfg *viper.Viper,
- log core.Logger,
- db *gorm.DB,
- taskID uint64,
- name string,
- subtasks []string,
- options map[string]interface{},
+ basicRes core.BasicRes,
+ task *models.Task,
progress chan core.RunningProgress,
) errors.Error {
- pluginMeta, err := core.GetPlugin(name)
+ pluginMeta, err := core.GetPlugin(task.Plugin)
if err != nil {
return errors.Default.WrapRaw(err)
}
pluginTask, ok := pluginMeta.(core.PluginTask)
if !ok {
- return errors.Default.New(fmt.Sprintf("plugin %s doesn't
support PluginTask interface", name))
+ return errors.Default.New(fmt.Sprintf("plugin %s doesn't
support PluginTask interface", task.Plugin))
}
return RunPluginSubTasks(
ctx,
- cfg,
- log,
- db,
- taskID,
- name,
- subtasks,
- options,
+ basicRes,
+ task,
pluginTask,
progress,
)
@@ -184,16 +156,12 @@ func RunPluginTask(
// RunPluginSubTasks FIXME ...
func RunPluginSubTasks(
ctx context.Context,
- cfg *viper.Viper,
- log core.Logger,
- db *gorm.DB,
- taskID uint64,
- name string,
- subtaskNames []string,
- options map[string]interface{},
+ basicRes core.BasicRes,
+ task *models.Task,
pluginTask core.PluginTask,
progress chan core.RunningProgress,
) errors.Error {
+ log := basicRes.GetLogger()
log.Info("start plugin")
// find out all possible subtasks this plugin can offer
subtaskMetas := pluginTask.SubTaskMetas()
@@ -210,6 +178,10 @@ func RunPluginSubTasks(
*/
// user specifies what subtasks to run
+ subtaskNames, err := task.GetSubTasks()
+ if err != nil {
+ return err
+ }
if len(subtaskNames) != 0 {
// decode user specified subtasks
var specifiedTasks []string
@@ -248,13 +220,17 @@ func RunPluginSubTasks(
}
}
- taskCtx := helper.NewDefaultTaskContext(ctx, cfg, log, db, name,
subtasksFlag, progress)
+ taskCtx := helper.NewDefaultTaskContext(ctx, basicRes, task.Plugin,
subtasksFlag, progress)
if closeablePlugin, ok := pluginTask.(core.CloseablePluginTask); ok {
defer closeablePlugin.Close(taskCtx)
}
+ options, err := task.GetOptions()
+ if err != nil {
+ return err
+ }
taskData, err := pluginTask.PrepareTaskData(taskCtx, options)
if err != nil {
- return errors.Default.Wrap(err, fmt.Sprintf("error preparing
task data for %s", name))
+ return errors.Default.Wrap(err, fmt.Sprintf("error preparing
task data for %s", task.Plugin))
}
taskCtx.SetData(taskData)
@@ -282,7 +258,7 @@ func RunPluginSubTasks(
SubTaskNumber: subtaskNumber,
}
}
- err = runSubtask(log, db, taskID, subtaskNumber, subtaskCtx,
subtaskMeta.EntryPoint)
+ err = runSubtask(basicRes, subtaskCtx, task.ID, subtaskNumber,
subtaskMeta.EntryPoint)
if err != nil {
err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask
%s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta))
log.Error(err, "")
@@ -295,7 +271,7 @@ func RunPluginSubTasks(
}
// UpdateProgressDetail FIXME ...
-func UpdateProgressDetail(db *gorm.DB, log core.Logger, taskId uint64,
progressDetail *models.TaskProgressDetail, p *core.RunningProgress) {
+func UpdateProgressDetail(basicRes core.BasicRes, taskId uint64,
progressDetail *models.TaskProgressDetail, p *core.RunningProgress) {
task := &models.Task{}
task.ID = taskId
switch p.Type {
@@ -306,9 +282,9 @@ func UpdateProgressDetail(db *gorm.DB, log core.Logger,
taskId uint64, progressD
progressDetail.FinishedSubTasks = p.Current
// TODO: get rid of db update
pct := float32(p.Current) / float32(p.Total)
- err := db.Model(task).Update("progress", pct).Error
+ err := basicRes.GetDal().UpdateColumn(task, "progress", pct)
if err != nil {
- log.Error(err, "failed to update progress")
+ basicRes.GetLogger().Error(err, "failed to update
progress")
}
case core.SubTaskSetProgress:
progressDetail.TotalRecords = p.Total
@@ -322,11 +298,10 @@ func UpdateProgressDetail(db *gorm.DB, log core.Logger,
taskId uint64, progressD
}
func runSubtask(
- log core.Logger,
- db *gorm.DB,
+ basicRes core.BasicRes,
+ ctx core.SubTaskContext,
parentID uint64,
subtaskNumber int,
- ctx core.SubTaskContext,
entryPoint core.SubTaskEntryPoint,
) errors.Error {
beginAt := time.Now()
@@ -340,14 +315,14 @@ func runSubtask(
finishedAt := time.Now()
subtask.FinishedAt = &finishedAt
subtask.SpentSeconds = finishedAt.Unix() - beginAt.Unix()
- recordSubtask(log, db, subtask)
+ recordSubtask(basicRes, subtask)
}()
return entryPoint(ctx)
}
-func recordSubtask(log core.Logger, db *gorm.DB, subtask *models.Subtask) {
- if err := db.Create(&subtask).Error; err != nil {
- log.Error(err, "error writing subtask %d status to DB: %v",
subtask.ID)
+func recordSubtask(basicRes core.BasicRes, subtask *models.Subtask) {
+ if err := basicRes.GetDal().Create(subtask); err != nil {
+ basicRes.GetLogger().Error(err, "error writing subtask %d
status to DB: %v", subtask.ID)
}
}
diff --git a/scripts/compile-plugins.sh b/scripts/compile-plugins.sh
index b2ea0f06b..ea220924e 100755
--- a/scripts/compile-plugins.sh
+++ b/scripts/compile-plugins.sh
@@ -20,21 +20,19 @@
# make dev
#
# compile specific plugin and fire up api server:
-# PLUGIN=<PLUGIN_NAME> make dev
-# PLUGIN=<PLUGIN_NAME> PLUGIN2=<PLUGIN_NAME2> make dev
+# PLUGIN=<PLUGIN_NAME[,PLUGIN_NAME2]> make dev
#
# compile all plugins and fire up api server in DEBUG MODE with `delve`:
# make debug
#
# compile specific plugin and fire up api server in DEBUG MODE with `delve`:
-# PLUGIN=<PLUGIN_NAME> make dev
-# PLUGIN=<PLUGIN_NAME> PLUGIN2=<PLUGIN_NAME> make dev
+# PLUGIN=<PLUGIN_NAME[,PLUGIN_NAME2]> make debug
set -e
echo "Usage: "
echo " build all plugins: $0 [golang build flags...]"
-echo " build and keep one plugin only: PLUGIN=github,jira $0 [golang build
flags...]"
+echo " build and keep specified plugins only: PLUGIN=github,jira $0 [golang
build flags...]"
SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
PLUGIN_SRC_DIR=$SCRIPT_DIR/../plugins
diff --git a/services/init.go b/services/init.go
index 27e8f91c5..204d548ee 100644
--- a/services/init.go
+++ b/services/init.go
@@ -23,7 +23,6 @@ import (
"sync"
"github.com/apache/incubator-devlake/errors"
- "github.com/apache/incubator-devlake/impl"
"github.com/apache/incubator-devlake/config"
"github.com/apache/incubator-devlake/impl/dalgorm"
@@ -50,28 +49,22 @@ const failToCreateCronJob = "created cron job failed"
// Init the services module
func Init() {
var err error
- // basic resources initialization
cfg = config.GetConfig()
log = logger.Global
- db, err = runner.NewGormDb(cfg, logger.Global.Nested("db"))
- if err != nil {
- panic(err)
- }
- // gorm doesn't support creating a PrepareStmt=false session from a
PrepareStmt=true
- // but the lockDatabase needs PrepareStmt=false for table locking, we
have to deal with it here
- lockingDb, err := runner.NewGormDbEx(cfg,
logger.Global.Nested("migrator db"), &dal.SessionConfig{
- PrepareStmt: false,
- SkipDefaultTransaction: true,
- })
- if err != nil {
- panic(err)
- }
- err = lockDatabase(dalgorm.NewDalgorm(lockingDb))
+ db, err = runner.NewGormDb(cfg, log)
if err != nil {
panic(err)
}
- basicRes = impl.NewDefaultBasicRes(cfg, log, dalgorm.NewDalgorm(db))
+ // TODO: this is ugly, the lockDb / CreateAppBasicRes are coupled via
global variables cfg/log
+ // it is too much for this refactor, let's solve it later
+
+ // lock the database to avoid multiple devlake instances from sharing
the same one
+ lockDb()
+
+ // basic resources initialization
+ basicRes = runner.CreateBasicRes(cfg, log, db)
+
// initialize db migrator
migrator, err = runner.InitMigrator(basicRes)
if err != nil {
@@ -82,12 +75,7 @@ func Init() {
// now,
// load plugins
- err = runner.LoadPlugins(
- cfg.GetString("PLUGIN_DIR"),
- cfg,
- logger.Global.Nested("plugin"),
- db,
- )
+ err = runner.LoadPlugins(basicRes)
if err != nil {
panic(err)
}
@@ -129,3 +117,19 @@ func ExecuteMigration() errors.Error {
func MigrationRequireConfirmation() bool {
return migrator.HasPendingScripts()
}
+
+func lockDb() {
+ // gorm doesn't support creating a PrepareStmt=false session from a
PrepareStmt=true
+ // but the lockDatabase needs PrepareStmt=false for table locking, we
have to deal with it here
+ lockingDb, err := runner.NewGormDbEx(cfg,
logger.Global.Nested("migrator db"), &dal.SessionConfig{
+ PrepareStmt: false,
+ SkipDefaultTransaction: true,
+ })
+ if err != nil {
+ panic(err)
+ }
+ err = lockDatabase(dalgorm.NewDalgorm(lockingDb))
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/services/pipeline_runner.go b/services/pipeline_runner.go
index 8eea52ebb..2d61bb970 100644
--- a/services/pipeline_runner.go
+++ b/services/pipeline_runner.go
@@ -39,9 +39,7 @@ type pipelineRunner struct {
func (p *pipelineRunner) runPipelineStandalone() errors.Error {
return runner.RunPipeline(
- cfg,
- p.logger,
- db,
+ runner.CreateBasicRes(cfg, p.logger, db),
p.pipeline.ID,
func(taskIds []uint64) errors.Error {
return RunTasksStandalone(p.logger, taskIds)
diff --git a/services/task.go b/services/task.go
index f81db2cba..f7e1eaa68 100644
--- a/services/task.go
+++ b/services/task.go
@@ -322,9 +322,7 @@ func runTaskStandalone(parentLog core.Logger, taskId
uint64) errors.Error {
go updateTaskProgress(taskId, progress)
err = runner.RunTask(
ctx,
- cfg,
- parentLog,
- db,
+ runner.CreateBasicRes(cfg, parentLog, db),
progress,
taskId,
)
@@ -347,7 +345,7 @@ func updateTaskProgress(taskId uint64, progress chan
core.RunningProgress) {
progressDetail := data.ProgressDetail
for p := range progress {
runningTasks.mu.Lock()
- runner.UpdateProgressDetail(db, log, taskId, progressDetail, &p)
+ runner.UpdateProgressDetail(basicRes, taskId, progressDetail,
&p)
runningTasks.mu.Unlock()
}
}
diff --git a/worker/app/pipeline_workflow.go b/worker/app/pipeline_workflow.go
index d3317c113..2943beb3e 100644
--- a/worker/app/pipeline_workflow.go
+++ b/worker/app/pipeline_workflow.go
@@ -30,15 +30,14 @@ import (
// DevLakePipelineWorkflow FIXME ...
func DevLakePipelineWorkflow(ctx workflow.Context, configJson []byte,
pipelineId uint64, loggerConfig *core.LoggerConfig) errors.Error {
- cfg, log, db, err := loadResources(configJson, loggerConfig)
+ basicRes, err := loadResources(configJson, loggerConfig)
if err != nil {
return errors.Convert(err)
}
+ log := basicRes.GetLogger()
log.Info("received pipeline #%d", pipelineId)
err = runner.RunPipeline(
- cfg,
- log,
- db,
+ basicRes,
pipelineId,
func(taskIds []uint64) errors.Error {
return runTasks(ctx, configJson, taskIds, log)
diff --git a/worker/app/shared.go b/worker/app/shared.go
index a46090995..8fe17484c 100644
--- a/worker/app/shared.go
+++ b/worker/app/shared.go
@@ -19,33 +19,34 @@ package app
import (
"bytes"
+
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/runner"
"github.com/spf13/viper"
- "gorm.io/gorm"
)
-func loadResources(configJson []byte, loggerConfig *core.LoggerConfig)
(*viper.Viper, core.Logger, *gorm.DB, errors.Error) {
+func loadResources(configJson []byte, loggerConfig *core.LoggerConfig)
(core.BasicRes, errors.Error) {
+ // TODO: should be redirected to server
+ globalLogger := logger.Global.Nested("worker")
// prepare
cfg := viper.New()
cfg.SetConfigType("json")
err := cfg.ReadConfig(bytes.NewBuffer(configJson))
if err != nil {
- return nil, nil, nil, errors.Convert(err)
+ globalLogger.Error(err, "failed to load resources")
+ return nil, errors.Convert(err)
}
- // TODO: should be redirected to server
- globalLogger := logger.Global.Nested("worker")
db, err := runner.NewGormDb(cfg, globalLogger)
if err != nil {
- return nil, nil, nil, errors.Convert(err)
+ return nil, errors.Convert(err)
}
log, err := getWorkerLogger(globalLogger, loggerConfig)
if err != nil {
- return nil, nil, nil, errors.Convert(err)
+ return nil, errors.Convert(err)
}
- return cfg, log, db, errors.Convert(err)
+ return runner.CreateBasicRes(cfg, log, db), nil
}
func getWorkerLogger(log core.Logger, logConfig *core.LoggerConfig)
(core.Logger, errors.Error) {
diff --git a/worker/app/task_activity.go b/worker/app/task_activity.go
index b3781be42..c5cfc3734 100644
--- a/worker/app/task_activity.go
+++ b/worker/app/task_activity.go
@@ -19,6 +19,7 @@ package app
import (
"context"
+
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/models"
"github.com/apache/incubator-devlake/plugins/core"
@@ -28,21 +29,22 @@ import (
// DevLakeTaskActivity FIXME ...
func DevLakeTaskActivity(ctx context.Context, configJson []byte, taskId
uint64, loggerConfig *core.LoggerConfig) errors.Error {
- cfg, log, db, err := loadResources(configJson, loggerConfig)
+ basicRes, err := loadResources(configJson, loggerConfig)
if err != nil {
return err
}
+ log := basicRes.GetLogger()
log.Info("received task #%d", taskId)
progressDetail := &models.TaskProgressDetail{}
progChan := make(chan core.RunningProgress)
defer close(progChan)
go func() {
for p := range progChan {
- runner.UpdateProgressDetail(db, log, taskId,
progressDetail, &p)
+ runner.UpdateProgressDetail(basicRes, taskId,
progressDetail, &p)
activity.RecordHeartbeat(ctx, progressDetail)
}
}()
- err = runner.RunTask(ctx, cfg, log, db, progChan, taskId)
+ err = runner.RunTask(ctx, basicRes, progChan, taskId)
if err != nil {
log.Error(err, "failed to execute task #%d", taskId)
}
diff --git a/worker/main.go b/worker/main.go
index d3bd6a754..b98911b94 100644
--- a/worker/main.go
+++ b/worker/main.go
@@ -18,10 +18,10 @@ limitations under the License.
package main
import (
- "github.com/apache/incubator-devlake/errors"
"log"
- "github.com/apache/incubator-devlake/config"
+ "github.com/apache/incubator-devlake/errors"
+
"github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/runner"
_ "github.com/apache/incubator-devlake/version"
@@ -31,22 +31,17 @@ import (
)
func main() {
- // basic resources
- cfg := config.GetConfig()
- db, err := runner.NewGormDb(cfg, logger.Global)
- if err != nil {
- panic(err)
- }
- err = runner.LoadPlugins(cfg.GetString("PLUGIN_DIR"), cfg,
logger.Global, db)
+ basicRes := runner.CreateAppBasicRes()
+ err := runner.LoadPlugins(basicRes)
if err != nil {
panic(err)
}
// establish temporal connection
- TASK_QUEUE := cfg.GetString("TEMPORAL_TASK_QUEUE")
+ TASK_QUEUE := basicRes.GetConfig("TEMPORAL_TASK_QUEUE")
// Create the client object just once per process
c, err := errors.Convert01(client.NewClient(client.Options{
- HostPort: cfg.GetString("TEMPORAL_URL"),
+ HostPort: basicRes.GetConfig("TEMPORAL_URL"),
Logger: app.NewTemporalLogger(logger.Global),
}))
if err != nil {