This is an automated email from the ASF dual-hosted git repository. lynwee pushed a commit to branch dev-1 in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 2e69df0bc44a6c2f8d264c564bd0379ba9c74c79 Author: d4x1 <[email protected]> AuthorDate: Sat Sep 14 15:34:02 2024 +0800 feat(pipeline): check pipelines' tokens before executing/creating pipeline --- backend/core/plugin/plugin_api.go | 4 ++ backend/core/plugin/plugin_blueprint.go | 10 +++ backend/helpers/pluginhelper/api/misc_helpers.go | 10 +++ backend/plugins/ae/impl/impl.go | 5 ++ backend/plugins/azuredevops_go/impl/impl.go | 5 ++ backend/plugins/bamboo/impl/impl.go | 5 ++ backend/plugins/bitbucket/impl/impl.go | 5 ++ backend/plugins/bitbucket_server/impl/impl.go | 5 ++ backend/plugins/circleci/impl/impl.go | 5 ++ backend/plugins/customize/impl/impl.go | 4 ++ backend/plugins/dbt/impl/impl.go | 4 ++ backend/plugins/dora/impl/impl.go | 5 +- backend/plugins/feishu/impl/impl.go | 5 ++ backend/plugins/gitee/impl/impl.go | 5 ++ backend/plugins/gitextractor/impl/impl.go | 4 ++ backend/plugins/github/impl/impl.go | 5 ++ backend/plugins/gitlab/impl/impl.go | 5 ++ backend/plugins/icla/impl/impl.go | 4 ++ backend/plugins/issue_trace/impl/enricher.go | 5 +- backend/plugins/jenkins/impl/impl.go | 5 ++ backend/plugins/jira/impl/impl.go | 5 ++ backend/plugins/linker/impl/impl.go | 4 ++ backend/plugins/opsgenie/impl/impl.go | 5 ++ backend/plugins/org/impl/impl.go | 22 +++++++ backend/plugins/org/tasks/check_token.go | 76 ++++++++++++++++++++++ backend/plugins/org/tasks/task_data.go | 10 ++- backend/plugins/pagerduty/impl/impl.go | 5 ++ backend/plugins/refdiff/impl/impl.go | 4 ++ backend/plugins/slack/impl/impl.go | 5 ++ backend/plugins/sonarqube/impl/impl.go | 5 ++ backend/plugins/starrocks/impl/impl.go | 4 ++ backend/plugins/tapd/impl/impl.go | 5 ++ backend/plugins/teambition/impl/impl.go | 5 ++ backend/plugins/trello/impl/impl.go | 5 ++ backend/plugins/webhook/impl/impl.go | 4 ++ backend/plugins/zentao/impl/impl.go | 5 ++ backend/server/services/blueprint.go | 75 +++++++++++++++++++++ backend/server/services/blueprint_makeplan_v200.go | 15 +++++ backend/server/services/project.go | 2 +- .../server/services/remote/models/plugin_remote.go | 1 + .../server/services/remote/plugin/plugin_impl.go | 5 ++ 41 files changed, 367 insertions(+), 5 deletions(-) diff --git a/backend/core/plugin/plugin_api.go b/backend/core/plugin/plugin_api.go index f9195d2ee..ea41a4895 100644 --- a/backend/core/plugin/plugin_api.go +++ b/backend/core/plugin/plugin_api.go @@ -73,6 +73,10 @@ type PluginApi interface { ApiResources() map[string]map[string]ApiResourceHandler } +type PluginTestConnectionAPI interface { + TestConnection(id uint64) errors.Error +} + const wrapResponseError = "WRAP_RESPONSE_ERROR" func WrapTestConnectionErrResp(basicRes context.BasicRes, err errors.Error) errors.Error { diff --git a/backend/core/plugin/plugin_blueprint.go b/backend/core/plugin/plugin_blueprint.go index b722bb9e2..a24513564 100644 --- a/backend/core/plugin/plugin_blueprint.go +++ b/backend/core/plugin/plugin_blueprint.go @@ -93,6 +93,16 @@ type ProjectMapper interface { MapProject(projectName string, scopes []Scope) (models.PipelinePlan, errors.Error) } +type ProjectTokenCheckerConnection struct { + PluginName string + ConnectionId uint64 +} + +// ProjectTokenChecker is implemented by the plugin org, which generate a task tp check all connection's tokens +type ProjectTokenChecker interface { + MakePipeline(skipCollectors bool, projectName string, scopes []ProjectTokenCheckerConnection) (models.PipelinePlan, errors.Error) +} + // CompositeDataSourcePluginBlueprintV200 is for unit test type CompositeDataSourcePluginBlueprintV200 interface { PluginMeta diff --git a/backend/helpers/pluginhelper/api/misc_helpers.go b/backend/helpers/pluginhelper/api/misc_helpers.go index e7315ceab..3dcf8778c 100644 --- a/backend/helpers/pluginhelper/api/misc_helpers.go +++ b/backend/helpers/pluginhelper/api/misc_helpers.go @@ -18,9 +18,11 @@ limitations under the License. package api import ( + "fmt" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models" + "github.com/apache/incubator-devlake/core/plugin" ) // CallDB wraps DB calls with this signature, and handles the case if the struct is wrapped in a models.DynamicTabler. @@ -31,3 +33,11 @@ func CallDB(f func(any, ...dal.Clause) errors.Error, x any, clauses ...dal.Claus } return f(x, clauses...) } + +func GenerateTestingConnectionApiResourceInput(connectionID uint64) *plugin.ApiResourceInput { + return &plugin.ApiResourceInput{ + Params: map[string]string{ + "connectionId": fmt.Sprintf("%d", connectionID), + }, + } +} diff --git a/backend/plugins/ae/impl/impl.go b/backend/plugins/ae/impl/impl.go index 389857e1f..7356d9e0e 100644 --- a/backend/plugins/ae/impl/impl.go +++ b/backend/plugins/ae/impl/impl.go @@ -130,6 +130,11 @@ func (p AE) MigrationScripts() []plugin.MigrationScript { return migrationscripts.All() } +func (p AE) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p AE) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/azuredevops_go/impl/impl.go b/backend/plugins/azuredevops_go/impl/impl.go index c567e49f5..2aecfde51 100644 --- a/backend/plugins/azuredevops_go/impl/impl.go +++ b/backend/plugins/azuredevops_go/impl/impl.go @@ -255,6 +255,11 @@ func (p Azuredevops) ApiResources() map[string]map[string]plugin.ApiResourceHand } } +func (p Azuredevops) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Azuredevops) MakeDataSourcePipelinePlanV200( connectionId uint64, scopes []*coreModels.BlueprintScope, diff --git a/backend/plugins/bamboo/impl/impl.go b/backend/plugins/bamboo/impl/impl.go index 2aae7ad5e..7f745c6dc 100644 --- a/backend/plugins/bamboo/impl/impl.go +++ b/backend/plugins/bamboo/impl/impl.go @@ -209,6 +209,11 @@ func (p Bamboo) MigrationScripts() []plugin.MigrationScript { return migrationscripts.All() } +func (p Bamboo) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Bamboo) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/bitbucket/impl/impl.go b/backend/plugins/bitbucket/impl/impl.go index 9a23ab1b3..7eeaf98fa 100644 --- a/backend/plugins/bitbucket/impl/impl.go +++ b/backend/plugins/bitbucket/impl/impl.go @@ -207,6 +207,11 @@ func (p Bitbucket) MakeDataSourcePipelinePlanV200( return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes, skipCollectors) } +func (p Bitbucket) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Bitbucket) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/bitbucket_server/impl/impl.go b/backend/plugins/bitbucket_server/impl/impl.go index 18b4a4679..fe446d76f 100644 --- a/backend/plugins/bitbucket_server/impl/impl.go +++ b/backend/plugins/bitbucket_server/impl/impl.go @@ -170,6 +170,11 @@ func (p BitbucketServer) MakeDataSourcePipelinePlanV200( return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes, skipCollectors) } +func (p BitbucketServer) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p BitbucketServer) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "connections/:connectionId/test": { diff --git a/backend/plugins/circleci/impl/impl.go b/backend/plugins/circleci/impl/impl.go index 8e052711a..0af6b7db4 100644 --- a/backend/plugins/circleci/impl/impl.go +++ b/backend/plugins/circleci/impl/impl.go @@ -172,6 +172,11 @@ func (p Circleci) MigrationScripts() []plugin.MigrationScript { return migrationscripts.All() } +func (p Circleci) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Circleci) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/customize/impl/impl.go b/backend/plugins/customize/impl/impl.go index facb0515d..59b967aa5 100644 --- a/backend/plugins/customize/impl/impl.go +++ b/backend/plugins/customize/impl/impl.go @@ -110,3 +110,7 @@ func (p Customize) ApiResources() map[string]map[string]plugin.ApiResourceHandle }, } } + +func (p Customize) TestConnection(id uint64) errors.Error { + return nil +} diff --git a/backend/plugins/dbt/impl/impl.go b/backend/plugins/dbt/impl/impl.go index 88eafa9f6..55b7c2918 100644 --- a/backend/plugins/dbt/impl/impl.go +++ b/backend/plugins/dbt/impl/impl.go @@ -74,3 +74,7 @@ func (p Dbt) RootPkgPath() string { func (p Dbt) Name() string { return "dbt" } + +func (p Dbt) TestConnection(id uint64) errors.Error { + return nil +} diff --git a/backend/plugins/dora/impl/impl.go b/backend/plugins/dora/impl/impl.go index 246097398..0d41a94eb 100644 --- a/backend/plugins/dora/impl/impl.go +++ b/backend/plugins/dora/impl/impl.go @@ -19,7 +19,6 @@ package impl import ( "encoding/json" - "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" coreModels "github.com/apache/incubator-devlake/core/models" @@ -168,3 +167,7 @@ func (p Dora) MakeMetricPluginPipelinePlanV200(projectName string, options json. } return plan, nil } + +func (p Dora) TestConnection(id uint64) errors.Error { + return nil +} diff --git a/backend/plugins/feishu/impl/impl.go b/backend/plugins/feishu/impl/impl.go index 92e0f1571..3158b5f1f 100644 --- a/backend/plugins/feishu/impl/impl.go +++ b/backend/plugins/feishu/impl/impl.go @@ -132,6 +132,11 @@ func (p Feishu) MigrationScripts() []plugin.MigrationScript { return migrationscripts.All() } +func (p Feishu) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Feishu) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/gitee/impl/impl.go b/backend/plugins/gitee/impl/impl.go index 35be389e7..b422fbafc 100644 --- a/backend/plugins/gitee/impl/impl.go +++ b/backend/plugins/gitee/impl/impl.go @@ -187,6 +187,11 @@ func (p Gitee) MigrationScripts() []plugin.MigrationScript { return migrationscripts.All() } +func (p Gitee) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Gitee) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/gitextractor/impl/impl.go b/backend/plugins/gitextractor/impl/impl.go index 468e51734..45c84ab6e 100644 --- a/backend/plugins/gitextractor/impl/impl.go +++ b/backend/plugins/gitextractor/impl/impl.go @@ -122,3 +122,7 @@ func (p GitExtractor) Close(taskCtx plugin.TaskContext) errors.Error { func (p GitExtractor) RootPkgPath() string { return "github.com/apache/incubator-devlake/plugins/gitextractor" } + +func (p GitExtractor) TestConnection(id uint64) errors.Error { + return nil +} diff --git a/backend/plugins/github/impl/impl.go b/backend/plugins/github/impl/impl.go index 07342b786..e9f1469af 100644 --- a/backend/plugins/github/impl/impl.go +++ b/backend/plugins/github/impl/impl.go @@ -180,6 +180,11 @@ func (p Github) MigrationScripts() []plugin.MigrationScript { return migrationscripts.All() } +func (p Github) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Github) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/gitlab/impl/impl.go b/backend/plugins/gitlab/impl/impl.go index 2d4dbb4fc..30d448313 100644 --- a/backend/plugins/gitlab/impl/impl.go +++ b/backend/plugins/gitlab/impl/impl.go @@ -233,6 +233,11 @@ func (p Gitlab) MigrationScripts() []plugin.MigrationScript { return migrationscripts.All() } +func (p Gitlab) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Gitlab) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/icla/impl/impl.go b/backend/plugins/icla/impl/impl.go index 1f327f313..243074126 100644 --- a/backend/plugins/icla/impl/impl.go +++ b/backend/plugins/icla/impl/impl.go @@ -104,6 +104,10 @@ func (p Icla) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return nil } +func (p Icla) TestConnection(id uint64) errors.Error { + return nil +} + func (p Icla) Close(taskCtx plugin.TaskContext) errors.Error { data, ok := taskCtx.GetData().(*tasks.IclaTaskData) if !ok { diff --git a/backend/plugins/issue_trace/impl/enricher.go b/backend/plugins/issue_trace/impl/enricher.go index 1506f2563..aeca45ade 100644 --- a/backend/plugins/issue_trace/impl/enricher.go +++ b/backend/plugins/issue_trace/impl/enricher.go @@ -19,7 +19,6 @@ package impl import ( "encoding/json" - "github.com/apache/incubator-devlake/core/context" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" @@ -151,6 +150,10 @@ func (p IssueTrace) GetTablesInfo() []dal.Tabler { } } +func (p IssueTrace) TestConnection(id uint64) errors.Error { + return nil +} + func (p IssueTrace) MakeMetricPluginPipelinePlanV200(projectName string, options json.RawMessage) (coreModels.PipelinePlan, errors.Error) { op := &tasks.Options{} if options != nil && string(options) != "\"\"" { diff --git a/backend/plugins/jenkins/impl/impl.go b/backend/plugins/jenkins/impl/impl.go index 7411f7543..2068ad2b8 100644 --- a/backend/plugins/jenkins/impl/impl.go +++ b/backend/plugins/jenkins/impl/impl.go @@ -176,6 +176,11 @@ func (p Jenkins) MakeDataSourcePipelinePlanV200( return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes, skipCollectors) } +func (p Jenkins) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Jenkins) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/jira/impl/impl.go b/backend/plugins/jira/impl/impl.go index b7a62da20..51e623f32 100644 --- a/backend/plugins/jira/impl/impl.go +++ b/backend/plugins/jira/impl/impl.go @@ -285,6 +285,11 @@ func (p Jira) MigrationScripts() []plugin.MigrationScript { return migrationscripts.All() } +func (p Jira) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Jira) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/linker/impl/impl.go b/backend/plugins/linker/impl/impl.go index 917b22df9..27b9a6cef 100644 --- a/backend/plugins/linker/impl/impl.go +++ b/backend/plugins/linker/impl/impl.go @@ -125,3 +125,7 @@ func (p Linker) MakeMetricPluginPipelinePlanV200(projectName string, options jso } return plan, nil } + +func (p Linker) TestConnection(id uint64) errors.Error { + return nil +} diff --git a/backend/plugins/opsgenie/impl/impl.go b/backend/plugins/opsgenie/impl/impl.go index 5785d53ff..7da8552e6 100644 --- a/backend/plugins/opsgenie/impl/impl.go +++ b/backend/plugins/opsgenie/impl/impl.go @@ -155,6 +155,11 @@ func (p Opsgenie) Close(taskCtx plugin.TaskContext) errors.Error { return nil } +func (p Opsgenie) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Opsgenie) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/org/impl/impl.go b/backend/plugins/org/impl/impl.go index cd4ffa7fd..f0e6c4a0a 100644 --- a/backend/plugins/org/impl/impl.go +++ b/backend/plugins/org/impl/impl.go @@ -59,11 +59,33 @@ func (p Org) Name() string { func (p Org) SubTaskMetas() []plugin.SubTaskMeta { return []plugin.SubTaskMeta{ + tasks.TaskCheckTokenMeta, tasks.ConnectUserAccountsExactMeta, tasks.SetProjectMappingMeta, } } +func (p Org) MakePipeline(skipCollectors bool, projectName string, connections []plugin.ProjectTokenCheckerConnection) (coreModels.PipelinePlan, errors.Error) { + var plan coreModels.PipelinePlan + var stage coreModels.PipelineStage + + // construct task options for Org + options := make(map[string]interface{}) + if !skipCollectors { + options["projectConnections"] = connections + } + + stage = append(stage, &coreModels.PipelineTask{ + Plugin: p.Name(), + Subtasks: []string{ + tasks.TaskCheckTokenMeta.Name, + }, + Options: options, + }) + plan = append(plan, stage) + return plan, nil +} + func (p Org) MapProject(projectName string, scopes []plugin.Scope) (coreModels.PipelinePlan, errors.Error) { var plan coreModels.PipelinePlan var stage coreModels.PipelineStage diff --git a/backend/plugins/org/tasks/check_token.go b/backend/plugins/org/tasks/check_token.go new file mode 100644 index 000000000..d7b624bba --- /dev/null +++ b/backend/plugins/org/tasks/check_token.go @@ -0,0 +1,76 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tasks + +import ( + "fmt" + "github.com/apache/incubator-devlake/core/errors" + "github.com/apache/incubator-devlake/core/plugin" + "golang.org/x/sync/errgroup" +) + +var TaskCheckTokenMeta = plugin.SubTaskMeta{ + Name: "checkTokens", + EntryPoint: checkProjectTokens, + EnabledByDefault: true, + Description: "set project mapping", + DomainTypes: []string{plugin.DOMAIN_TYPE_CROSS}, +} + +func checkProjectTokens(taskCtx plugin.SubTaskContext) errors.Error { + logger := taskCtx.GetLogger() + taskData := taskCtx.GetData().(*TaskData) + connections := taskData.Options.ProjectConnections + logger.Debug("connections %+v", connections) + if len(connections) == 0 { + return nil + } + + g := new(errgroup.Group) + for _, connection := range connections { + conn := connection + logger.Debug("check conn: %+v", conn) + g.Go(func() error { + if err := checkConnectionToken(conn.ConnectionId, conn.PluginName); err != nil { + return err + } + return nil + }) + } + if err := g.Wait(); err != nil { + return errors.Convert(err) + } + return nil +} + +func checkConnectionToken(connectionID uint64, pluginName string) errors.Error { + pluginEntry, err := plugin.GetPlugin(pluginName) + if err != nil { + return err + } + if v, ok := pluginEntry.(plugin.PluginTestConnectionAPI); ok { + if err := v.TestConnection(connectionID); err != nil { + return err + } + return nil + } else { + msg := fmt.Sprintf("plugin: %s doesn't impl test connection api", pluginName) + return errors.Default.New(msg) + } + return nil +} diff --git a/backend/plugins/org/tasks/task_data.go b/backend/plugins/org/tasks/task_data.go index ca2a9b300..c92954740 100644 --- a/backend/plugins/org/tasks/task_data.go +++ b/backend/plugins/org/tasks/task_data.go @@ -20,8 +20,14 @@ package tasks import "github.com/apache/incubator-devlake/core/plugin" type Options struct { - ConnectionId uint64 `json:"connectionId"` - ProjectMappings []ProjectMapping `json:"projectMappings"` + ConnectionId uint64 `json:"connectionId"` + ProjectMappings []ProjectMapping `json:"projectMappings"` + ProjectConnections []ProjectConnection `json:"projectConnections"` +} + +type ProjectConnection struct { + PluginName string + ConnectionId uint64 } // ProjectMapping represents the relations between project and scopes diff --git a/backend/plugins/pagerduty/impl/impl.go b/backend/plugins/pagerduty/impl/impl.go index b237b30e4..827df8701 100644 --- a/backend/plugins/pagerduty/impl/impl.go +++ b/backend/plugins/pagerduty/impl/impl.go @@ -138,6 +138,11 @@ func (p PagerDuty) MigrationScripts() []plugin.MigrationScript { return migrationscripts.All() } +func (p PagerDuty) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p PagerDuty) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/refdiff/impl/impl.go b/backend/plugins/refdiff/impl/impl.go index b5572f643..e660b1424 100644 --- a/backend/plugins/refdiff/impl/impl.go +++ b/backend/plugins/refdiff/impl/impl.go @@ -110,3 +110,7 @@ func (p RefDiff) RootPkgPath() string { func (p RefDiff) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return nil } + +func (p RefDiff) TestConnection(id uint64) errors.Error { + return nil +} diff --git a/backend/plugins/slack/impl/impl.go b/backend/plugins/slack/impl/impl.go index 8b3de9142..9ac894373 100644 --- a/backend/plugins/slack/impl/impl.go +++ b/backend/plugins/slack/impl/impl.go @@ -131,6 +131,11 @@ func (p Slack) MigrationScripts() []plugin.MigrationScript { return migrationscripts.All() } +func (p Slack) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Slack) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/sonarqube/impl/impl.go b/backend/plugins/sonarqube/impl/impl.go index b6d214032..f0da89812 100644 --- a/backend/plugins/sonarqube/impl/impl.go +++ b/backend/plugins/sonarqube/impl/impl.go @@ -170,6 +170,11 @@ func (p Sonarqube) MigrationScripts() []plugin.MigrationScript { return migrationscripts.All() } +func (p Sonarqube) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Sonarqube) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/starrocks/impl/impl.go b/backend/plugins/starrocks/impl/impl.go index 511b8c7ec..1e4f15827 100644 --- a/backend/plugins/starrocks/impl/impl.go +++ b/backend/plugins/starrocks/impl/impl.go @@ -67,3 +67,7 @@ func (s StarRocks) Name() string { func (s StarRocks) RootPkgPath() string { return "github.com/merico-dev/lake/plugins/starrocks" } + +func (p StarRocks) TestConnection(id uint64) errors.Error { + return nil +} diff --git a/backend/plugins/tapd/impl/impl.go b/backend/plugins/tapd/impl/impl.go index 2885a4337..2e133d821 100644 --- a/backend/plugins/tapd/impl/impl.go +++ b/backend/plugins/tapd/impl/impl.go @@ -265,6 +265,11 @@ func (p Tapd) MigrationScripts() []plugin.MigrationScript { return migrationscripts.All() } +func (p Tapd) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Tapd) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/teambition/impl/impl.go b/backend/plugins/teambition/impl/impl.go index 662cace0f..728602d86 100644 --- a/backend/plugins/teambition/impl/impl.go +++ b/backend/plugins/teambition/impl/impl.go @@ -169,6 +169,11 @@ func (p Teambition) MigrationScripts() []plugin.MigrationScript { return migrationscripts.All() } +func (p Teambition) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Teambition) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/trello/impl/impl.go b/backend/plugins/trello/impl/impl.go index b3a0f088d..fdcb10fdc 100644 --- a/backend/plugins/trello/impl/impl.go +++ b/backend/plugins/trello/impl/impl.go @@ -151,6 +151,11 @@ func (p Trello) ScopeConfig() dal.Tabler { return &models.TrelloScopeConfig{} } +func (p Trello) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Trello) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/plugins/webhook/impl/impl.go b/backend/plugins/webhook/impl/impl.go index ada304783..90e1f0198 100644 --- a/backend/plugins/webhook/impl/impl.go +++ b/backend/plugins/webhook/impl/impl.go @@ -77,6 +77,10 @@ func (p Webhook) MigrationScripts() []plugin.MigrationScript { return migrationscripts.All() } +func (p Webhook) TestConnection(id uint64) errors.Error { + return nil +} + func (p Webhook) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "connections": { diff --git a/backend/plugins/zentao/impl/impl.go b/backend/plugins/zentao/impl/impl.go index 5aa74c966..55a98165d 100644 --- a/backend/plugins/zentao/impl/impl.go +++ b/backend/plugins/zentao/impl/impl.go @@ -277,6 +277,11 @@ func (p Zentao) MigrationScripts() []plugin.MigrationScript { return migrationscripts.All() } +func (p Zentao) TestConnection(id uint64) errors.Error { + _, err := api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p Zentao) ApiResources() map[string]map[string]plugin.ApiResourceHandler { return map[string]map[string]plugin.ApiResourceHandler{ "test": { diff --git a/backend/server/services/blueprint.go b/backend/server/services/blueprint.go index d400abae2..01395bb24 100644 --- a/backend/server/services/blueprint.go +++ b/backend/server/services/blueprint.go @@ -20,6 +20,9 @@ package services import ( "encoding/json" "fmt" + "github.com/apache/incubator-devlake/core/log" + "github.com/apache/incubator-devlake/core/plugin" + "golang.org/x/sync/errgroup" "strings" "sync" @@ -412,6 +415,11 @@ func TriggerBlueprint(id uint64, triggerSyncPolicy *models.TriggerSyncPolicy, sh } blueprint.SkipCollectors = triggerSyncPolicy.SkipCollectors blueprint.FullSync = triggerSyncPolicy.FullSync + + if err := checkBlueprintTokens(blueprint, triggerSyncPolicy); err != nil { + return nil, errors.Default.Wrap(err, "check blue print tokens") + } + pipeline, err := createPipelineByBlueprint(blueprint, &models.SyncPolicy{ SkipOnFail: false, TimeAfter: nil, @@ -427,3 +435,70 @@ func TriggerBlueprint(id uint64, triggerSyncPolicy *models.TriggerSyncPolicy, sh } return pipeline, nil } + +func needToCheckToken(triggerSyncPolicy *models.TriggerSyncPolicy) bool { + // case1: retransform + if triggerSyncPolicy.SkipCollectors && triggerSyncPolicy.FullSync == false { + return false + } + // case2: collect data + if triggerSyncPolicy.SkipCollectors == false && triggerSyncPolicy.FullSync == false { + return true + } + // case3: collect data with fullsync + if triggerSyncPolicy.SkipCollectors == false && triggerSyncPolicy.FullSync == true { + return true + } + // case4: others + return true +} + +func checkBlueprintTokens(blueprint *models.Blueprint, triggerSyncPolicy *models.TriggerSyncPolicy) errors.Error { + if blueprint == nil { + return errors.Default.New("blueprint is nil") + } + if triggerSyncPolicy == nil { + return errors.Default.New("triggerSyncPolicy is nil") + } + + if !needToCheckToken(triggerSyncPolicy) { + return nil + } + + if len(blueprint.Connections) == 0 { + return nil + } + + g := new(errgroup.Group) + for _, connection := range blueprint.Connections { + conn := *connection + g.Go(func() error { + if err := checkConnectionToken(logger, conn); err != nil { + return err + } + return nil + }) + } + if err := g.Wait(); err != nil { + return errors.Convert(err) + } + + return nil +} + +func checkConnectionToken(logger log.Logger, connection models.BlueprintConnection) errors.Error { + pluginEntry, err := plugin.GetPlugin(connection.PluginName) + if err != nil { + return err + } + if v, ok := pluginEntry.(plugin.PluginTestConnectionAPI); ok { + if err := v.TestConnection(connection.ConnectionId); err != nil { + logger.Error(err, "plugin: %s, id: %d", connection.PluginName, connection.ConnectionId) + return err + } + return nil + } else { + msg := fmt.Sprintf("plugin: %s doesn't impl test connection api", connection.PluginName) + return errors.Default.New(msg) + } +} diff --git a/backend/server/services/blueprint_makeplan_v200.go b/backend/server/services/blueprint_makeplan_v200.go index 727e843de..960465069 100644 --- a/backend/server/services/blueprint_makeplan_v200.go +++ b/backend/server/services/blueprint_makeplan_v200.go @@ -121,11 +121,25 @@ func GeneratePlanJsonV200( } } var planForProjectMapping coreModels.PipelinePlan + var planForProjectTokenChecker coreModels.PipelinePlan if projectName != "" { p, err := plugin.GetPlugin("org") if err != nil { return nil, errors.Default.Wrap(err, "get plugin org") } + if pluginBp, ok := p.(plugin.ProjectTokenChecker); ok { + var simpleConns []plugin.ProjectTokenCheckerConnection + for _, connection := range connections { + simpleConns = append(simpleConns, plugin.ProjectTokenCheckerConnection{ + PluginName: connection.PluginName, + ConnectionId: connection.ConnectionId, + }) + } + planForProjectTokenChecker, err = pluginBp.MakePipeline(skipCollectors, projectName, simpleConns) + if err != nil { + return nil, errors.Default.Wrap(err, "org token checker make pipeline") + } + } if pluginBp, ok := p.(plugin.ProjectMapper); ok { planForProjectMapping, err = pluginBp.MapProject(projectName, scopes) if err != nil { @@ -134,6 +148,7 @@ func GeneratePlanJsonV200( } } plan := SequentializePipelinePlans( + planForProjectTokenChecker, planForProjectMapping, ParallelizePipelinePlans(sourcePlans...), ParallelizePipelinePlans(metricPlans...), diff --git a/backend/server/services/project.go b/backend/server/services/project.go index 8e9528d38..990dbae2e 100644 --- a/backend/server/services/project.go +++ b/backend/server/services/project.go @@ -217,7 +217,7 @@ func PatchProject(name string, body map[string]interface{}) (*models.ApiOutputPr return nil, err } - // allowed to changed the name + // allowed to change the name if projectInput.Name == "" { projectInput.Name = name } diff --git a/backend/server/services/remote/models/plugin_remote.go b/backend/server/services/remote/models/plugin_remote.go index 988283630..3c97d8021 100644 --- a/backend/server/services/remote/models/plugin_remote.go +++ b/backend/server/services/remote/models/plugin_remote.go @@ -30,4 +30,5 @@ type RemotePlugin interface { plugin.PluginModel plugin.PluginMigration plugin.PluginSource + plugin.PluginTestConnectionAPI } diff --git a/backend/server/services/remote/plugin/plugin_impl.go b/backend/server/services/remote/plugin/plugin_impl.go index 6242b1600..ea7e912c5 100644 --- a/backend/server/services/remote/plugin/plugin_impl.go +++ b/backend/server/services/remote/plugin/plugin_impl.go @@ -230,6 +230,11 @@ func (p *remotePluginImpl) ApiResources() map[string]map[string]plugin.ApiResour return p.resources } +func (p *remotePluginImpl) TestConnection(id uint64) errors.Error { + _, err := p.resources["connections/:connectionId/test"]["POST"](api.GenerateTestingConnectionApiResourceInput(id)) + return err +} + func (p *remotePluginImpl) OpenApiSpec() string { return p.openApiSpec }
