This is an automated email from the ASF dual-hosted git repository.

lynwee pushed a commit to branch dev-1
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git

commit 2e69df0bc44a6c2f8d264c564bd0379ba9c74c79
Author: d4x1 <[email protected]>
AuthorDate: Sat Sep 14 15:34:02 2024 +0800

    feat(pipeline): check pipelines' tokens before executing/creating pipeline
---
 backend/core/plugin/plugin_api.go                  |  4 ++
 backend/core/plugin/plugin_blueprint.go            | 10 +++
 backend/helpers/pluginhelper/api/misc_helpers.go   | 10 +++
 backend/plugins/ae/impl/impl.go                    |  5 ++
 backend/plugins/azuredevops_go/impl/impl.go        |  5 ++
 backend/plugins/bamboo/impl/impl.go                |  5 ++
 backend/plugins/bitbucket/impl/impl.go             |  5 ++
 backend/plugins/bitbucket_server/impl/impl.go      |  5 ++
 backend/plugins/circleci/impl/impl.go              |  5 ++
 backend/plugins/customize/impl/impl.go             |  4 ++
 backend/plugins/dbt/impl/impl.go                   |  4 ++
 backend/plugins/dora/impl/impl.go                  |  5 +-
 backend/plugins/feishu/impl/impl.go                |  5 ++
 backend/plugins/gitee/impl/impl.go                 |  5 ++
 backend/plugins/gitextractor/impl/impl.go          |  4 ++
 backend/plugins/github/impl/impl.go                |  5 ++
 backend/plugins/gitlab/impl/impl.go                |  5 ++
 backend/plugins/icla/impl/impl.go                  |  4 ++
 backend/plugins/issue_trace/impl/enricher.go       |  5 +-
 backend/plugins/jenkins/impl/impl.go               |  5 ++
 backend/plugins/jira/impl/impl.go                  |  5 ++
 backend/plugins/linker/impl/impl.go                |  4 ++
 backend/plugins/opsgenie/impl/impl.go              |  5 ++
 backend/plugins/org/impl/impl.go                   | 22 +++++++
 backend/plugins/org/tasks/check_token.go           | 76 ++++++++++++++++++++++
 backend/plugins/org/tasks/task_data.go             | 10 ++-
 backend/plugins/pagerduty/impl/impl.go             |  5 ++
 backend/plugins/refdiff/impl/impl.go               |  4 ++
 backend/plugins/slack/impl/impl.go                 |  5 ++
 backend/plugins/sonarqube/impl/impl.go             |  5 ++
 backend/plugins/starrocks/impl/impl.go             |  4 ++
 backend/plugins/tapd/impl/impl.go                  |  5 ++
 backend/plugins/teambition/impl/impl.go            |  5 ++
 backend/plugins/trello/impl/impl.go                |  5 ++
 backend/plugins/webhook/impl/impl.go               |  4 ++
 backend/plugins/zentao/impl/impl.go                |  5 ++
 backend/server/services/blueprint.go               | 75 +++++++++++++++++++++
 backend/server/services/blueprint_makeplan_v200.go | 15 +++++
 backend/server/services/project.go                 |  2 +-
 .../server/services/remote/models/plugin_remote.go |  1 +
 .../server/services/remote/plugin/plugin_impl.go   |  5 ++
 41 files changed, 367 insertions(+), 5 deletions(-)

diff --git a/backend/core/plugin/plugin_api.go 
b/backend/core/plugin/plugin_api.go
index f9195d2ee..ea41a4895 100644
--- a/backend/core/plugin/plugin_api.go
+++ b/backend/core/plugin/plugin_api.go
@@ -73,6 +73,10 @@ type PluginApi interface {
        ApiResources() map[string]map[string]ApiResourceHandler
 }
 
+type PluginTestConnectionAPI interface {
+       TestConnection(id uint64) errors.Error
+}
+
 const wrapResponseError = "WRAP_RESPONSE_ERROR"
 
 func WrapTestConnectionErrResp(basicRes context.BasicRes, err errors.Error) 
errors.Error {
diff --git a/backend/core/plugin/plugin_blueprint.go 
b/backend/core/plugin/plugin_blueprint.go
index b722bb9e2..a24513564 100644
--- a/backend/core/plugin/plugin_blueprint.go
+++ b/backend/core/plugin/plugin_blueprint.go
@@ -93,6 +93,16 @@ type ProjectMapper interface {
        MapProject(projectName string, scopes []Scope) (models.PipelinePlan, 
errors.Error)
 }
 
+type ProjectTokenCheckerConnection struct {
+       PluginName   string
+       ConnectionId uint64
+}
+
+// ProjectTokenChecker is implemented by the plugin org, which generate a task 
tp check all connection's tokens
+type ProjectTokenChecker interface {
+       MakePipeline(skipCollectors bool, projectName string, scopes 
[]ProjectTokenCheckerConnection) (models.PipelinePlan, errors.Error)
+}
+
 // CompositeDataSourcePluginBlueprintV200 is for unit test
 type CompositeDataSourcePluginBlueprintV200 interface {
        PluginMeta
diff --git a/backend/helpers/pluginhelper/api/misc_helpers.go 
b/backend/helpers/pluginhelper/api/misc_helpers.go
index e7315ceab..3dcf8778c 100644
--- a/backend/helpers/pluginhelper/api/misc_helpers.go
+++ b/backend/helpers/pluginhelper/api/misc_helpers.go
@@ -18,9 +18,11 @@ limitations under the License.
 package api
 
 import (
+       "fmt"
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/models"
+       "github.com/apache/incubator-devlake/core/plugin"
 )
 
 // CallDB wraps DB calls with this signature, and handles the case if the 
struct is wrapped in a models.DynamicTabler.
@@ -31,3 +33,11 @@ func CallDB(f func(any, ...dal.Clause) errors.Error, x any, 
clauses ...dal.Claus
        }
        return f(x, clauses...)
 }
+
+func GenerateTestingConnectionApiResourceInput(connectionID uint64) 
*plugin.ApiResourceInput {
+       return &plugin.ApiResourceInput{
+               Params: map[string]string{
+                       "connectionId": fmt.Sprintf("%d", connectionID),
+               },
+       }
+}
diff --git a/backend/plugins/ae/impl/impl.go b/backend/plugins/ae/impl/impl.go
index 389857e1f..7356d9e0e 100644
--- a/backend/plugins/ae/impl/impl.go
+++ b/backend/plugins/ae/impl/impl.go
@@ -130,6 +130,11 @@ func (p AE) MigrationScripts() []plugin.MigrationScript {
        return migrationscripts.All()
 }
 
+func (p AE) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p AE) ApiResources() map[string]map[string]plugin.ApiResourceHandler {
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/azuredevops_go/impl/impl.go 
b/backend/plugins/azuredevops_go/impl/impl.go
index c567e49f5..2aecfde51 100644
--- a/backend/plugins/azuredevops_go/impl/impl.go
+++ b/backend/plugins/azuredevops_go/impl/impl.go
@@ -255,6 +255,11 @@ func (p Azuredevops) ApiResources() 
map[string]map[string]plugin.ApiResourceHand
        }
 }
 
+func (p Azuredevops) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Azuredevops) MakeDataSourcePipelinePlanV200(
        connectionId uint64,
        scopes []*coreModels.BlueprintScope,
diff --git a/backend/plugins/bamboo/impl/impl.go 
b/backend/plugins/bamboo/impl/impl.go
index 2aae7ad5e..7f745c6dc 100644
--- a/backend/plugins/bamboo/impl/impl.go
+++ b/backend/plugins/bamboo/impl/impl.go
@@ -209,6 +209,11 @@ func (p Bamboo) MigrationScripts() 
[]plugin.MigrationScript {
        return migrationscripts.All()
 }
 
+func (p Bamboo) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Bamboo) ApiResources() map[string]map[string]plugin.ApiResourceHandler 
{
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/bitbucket/impl/impl.go 
b/backend/plugins/bitbucket/impl/impl.go
index 9a23ab1b3..7eeaf98fa 100644
--- a/backend/plugins/bitbucket/impl/impl.go
+++ b/backend/plugins/bitbucket/impl/impl.go
@@ -207,6 +207,11 @@ func (p Bitbucket) MakeDataSourcePipelinePlanV200(
        return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes, skipCollectors)
 }
 
+func (p Bitbucket) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Bitbucket) ApiResources() 
map[string]map[string]plugin.ApiResourceHandler {
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/bitbucket_server/impl/impl.go 
b/backend/plugins/bitbucket_server/impl/impl.go
index 18b4a4679..fe446d76f 100644
--- a/backend/plugins/bitbucket_server/impl/impl.go
+++ b/backend/plugins/bitbucket_server/impl/impl.go
@@ -170,6 +170,11 @@ func (p BitbucketServer) MakeDataSourcePipelinePlanV200(
        return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes, skipCollectors)
 }
 
+func (p BitbucketServer) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p BitbucketServer) ApiResources() 
map[string]map[string]plugin.ApiResourceHandler {
        return map[string]map[string]plugin.ApiResourceHandler{
                "connections/:connectionId/test": {
diff --git a/backend/plugins/circleci/impl/impl.go 
b/backend/plugins/circleci/impl/impl.go
index 8e052711a..0af6b7db4 100644
--- a/backend/plugins/circleci/impl/impl.go
+++ b/backend/plugins/circleci/impl/impl.go
@@ -172,6 +172,11 @@ func (p Circleci) MigrationScripts() 
[]plugin.MigrationScript {
        return migrationscripts.All()
 }
 
+func (p Circleci) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Circleci) ApiResources() 
map[string]map[string]plugin.ApiResourceHandler {
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/customize/impl/impl.go 
b/backend/plugins/customize/impl/impl.go
index facb0515d..59b967aa5 100644
--- a/backend/plugins/customize/impl/impl.go
+++ b/backend/plugins/customize/impl/impl.go
@@ -110,3 +110,7 @@ func (p Customize) ApiResources() 
map[string]map[string]plugin.ApiResourceHandle
                },
        }
 }
+
+func (p Customize) TestConnection(id uint64) errors.Error {
+       return nil
+}
diff --git a/backend/plugins/dbt/impl/impl.go b/backend/plugins/dbt/impl/impl.go
index 88eafa9f6..55b7c2918 100644
--- a/backend/plugins/dbt/impl/impl.go
+++ b/backend/plugins/dbt/impl/impl.go
@@ -74,3 +74,7 @@ func (p Dbt) RootPkgPath() string {
 func (p Dbt) Name() string {
        return "dbt"
 }
+
+func (p Dbt) TestConnection(id uint64) errors.Error {
+       return nil
+}
diff --git a/backend/plugins/dora/impl/impl.go 
b/backend/plugins/dora/impl/impl.go
index 246097398..0d41a94eb 100644
--- a/backend/plugins/dora/impl/impl.go
+++ b/backend/plugins/dora/impl/impl.go
@@ -19,7 +19,6 @@ package impl
 
 import (
        "encoding/json"
-
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        coreModels "github.com/apache/incubator-devlake/core/models"
@@ -168,3 +167,7 @@ func (p Dora) MakeMetricPluginPipelinePlanV200(projectName 
string, options json.
        }
        return plan, nil
 }
+
+func (p Dora) TestConnection(id uint64) errors.Error {
+       return nil
+}
diff --git a/backend/plugins/feishu/impl/impl.go 
b/backend/plugins/feishu/impl/impl.go
index 92e0f1571..3158b5f1f 100644
--- a/backend/plugins/feishu/impl/impl.go
+++ b/backend/plugins/feishu/impl/impl.go
@@ -132,6 +132,11 @@ func (p Feishu) MigrationScripts() 
[]plugin.MigrationScript {
        return migrationscripts.All()
 }
 
+func (p Feishu) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Feishu) ApiResources() map[string]map[string]plugin.ApiResourceHandler 
{
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/gitee/impl/impl.go 
b/backend/plugins/gitee/impl/impl.go
index 35be389e7..b422fbafc 100644
--- a/backend/plugins/gitee/impl/impl.go
+++ b/backend/plugins/gitee/impl/impl.go
@@ -187,6 +187,11 @@ func (p Gitee) MigrationScripts() []plugin.MigrationScript 
{
        return migrationscripts.All()
 }
 
+func (p Gitee) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Gitee) ApiResources() map[string]map[string]plugin.ApiResourceHandler {
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/gitextractor/impl/impl.go 
b/backend/plugins/gitextractor/impl/impl.go
index 468e51734..45c84ab6e 100644
--- a/backend/plugins/gitextractor/impl/impl.go
+++ b/backend/plugins/gitextractor/impl/impl.go
@@ -122,3 +122,7 @@ func (p GitExtractor) Close(taskCtx plugin.TaskContext) 
errors.Error {
 func (p GitExtractor) RootPkgPath() string {
        return "github.com/apache/incubator-devlake/plugins/gitextractor"
 }
+
+func (p GitExtractor) TestConnection(id uint64) errors.Error {
+       return nil
+}
diff --git a/backend/plugins/github/impl/impl.go 
b/backend/plugins/github/impl/impl.go
index 07342b786..e9f1469af 100644
--- a/backend/plugins/github/impl/impl.go
+++ b/backend/plugins/github/impl/impl.go
@@ -180,6 +180,11 @@ func (p Github) MigrationScripts() 
[]plugin.MigrationScript {
        return migrationscripts.All()
 }
 
+func (p Github) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Github) ApiResources() map[string]map[string]plugin.ApiResourceHandler 
{
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/gitlab/impl/impl.go 
b/backend/plugins/gitlab/impl/impl.go
index 2d4dbb4fc..30d448313 100644
--- a/backend/plugins/gitlab/impl/impl.go
+++ b/backend/plugins/gitlab/impl/impl.go
@@ -233,6 +233,11 @@ func (p Gitlab) MigrationScripts() 
[]plugin.MigrationScript {
        return migrationscripts.All()
 }
 
+func (p Gitlab) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Gitlab) ApiResources() map[string]map[string]plugin.ApiResourceHandler 
{
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/icla/impl/impl.go 
b/backend/plugins/icla/impl/impl.go
index 1f327f313..243074126 100644
--- a/backend/plugins/icla/impl/impl.go
+++ b/backend/plugins/icla/impl/impl.go
@@ -104,6 +104,10 @@ func (p Icla) ApiResources() 
map[string]map[string]plugin.ApiResourceHandler {
        return nil
 }
 
+func (p Icla) TestConnection(id uint64) errors.Error {
+       return nil
+}
+
 func (p Icla) Close(taskCtx plugin.TaskContext) errors.Error {
        data, ok := taskCtx.GetData().(*tasks.IclaTaskData)
        if !ok {
diff --git a/backend/plugins/issue_trace/impl/enricher.go 
b/backend/plugins/issue_trace/impl/enricher.go
index 1506f2563..aeca45ade 100644
--- a/backend/plugins/issue_trace/impl/enricher.go
+++ b/backend/plugins/issue_trace/impl/enricher.go
@@ -19,7 +19,6 @@ package impl
 
 import (
        "encoding/json"
-
        "github.com/apache/incubator-devlake/core/context"
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
@@ -151,6 +150,10 @@ func (p IssueTrace) GetTablesInfo() []dal.Tabler {
        }
 }
 
+func (p IssueTrace) TestConnection(id uint64) errors.Error {
+       return nil
+}
+
 func (p IssueTrace) MakeMetricPluginPipelinePlanV200(projectName string, 
options json.RawMessage) (coreModels.PipelinePlan, errors.Error) {
        op := &tasks.Options{}
        if options != nil && string(options) != "\"\"" {
diff --git a/backend/plugins/jenkins/impl/impl.go 
b/backend/plugins/jenkins/impl/impl.go
index 7411f7543..2068ad2b8 100644
--- a/backend/plugins/jenkins/impl/impl.go
+++ b/backend/plugins/jenkins/impl/impl.go
@@ -176,6 +176,11 @@ func (p Jenkins) MakeDataSourcePipelinePlanV200(
        return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes, skipCollectors)
 }
 
+func (p Jenkins) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Jenkins) ApiResources() 
map[string]map[string]plugin.ApiResourceHandler {
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/jira/impl/impl.go 
b/backend/plugins/jira/impl/impl.go
index b7a62da20..51e623f32 100644
--- a/backend/plugins/jira/impl/impl.go
+++ b/backend/plugins/jira/impl/impl.go
@@ -285,6 +285,11 @@ func (p Jira) MigrationScripts() []plugin.MigrationScript {
        return migrationscripts.All()
 }
 
+func (p Jira) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Jira) ApiResources() map[string]map[string]plugin.ApiResourceHandler {
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/linker/impl/impl.go 
b/backend/plugins/linker/impl/impl.go
index 917b22df9..27b9a6cef 100644
--- a/backend/plugins/linker/impl/impl.go
+++ b/backend/plugins/linker/impl/impl.go
@@ -125,3 +125,7 @@ func (p Linker) 
MakeMetricPluginPipelinePlanV200(projectName string, options jso
        }
        return plan, nil
 }
+
+func (p Linker) TestConnection(id uint64) errors.Error {
+       return nil
+}
diff --git a/backend/plugins/opsgenie/impl/impl.go 
b/backend/plugins/opsgenie/impl/impl.go
index 5785d53ff..7da8552e6 100644
--- a/backend/plugins/opsgenie/impl/impl.go
+++ b/backend/plugins/opsgenie/impl/impl.go
@@ -155,6 +155,11 @@ func (p Opsgenie) Close(taskCtx plugin.TaskContext) 
errors.Error {
        return nil
 }
 
+func (p Opsgenie) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Opsgenie) ApiResources() 
map[string]map[string]plugin.ApiResourceHandler {
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/org/impl/impl.go b/backend/plugins/org/impl/impl.go
index cd4ffa7fd..f0e6c4a0a 100644
--- a/backend/plugins/org/impl/impl.go
+++ b/backend/plugins/org/impl/impl.go
@@ -59,11 +59,33 @@ func (p Org) Name() string {
 
 func (p Org) SubTaskMetas() []plugin.SubTaskMeta {
        return []plugin.SubTaskMeta{
+               tasks.TaskCheckTokenMeta,
                tasks.ConnectUserAccountsExactMeta,
                tasks.SetProjectMappingMeta,
        }
 }
 
+func (p Org) MakePipeline(skipCollectors bool, projectName string, connections 
[]plugin.ProjectTokenCheckerConnection) (coreModels.PipelinePlan, errors.Error) 
{
+       var plan coreModels.PipelinePlan
+       var stage coreModels.PipelineStage
+
+       // construct task options for Org
+       options := make(map[string]interface{})
+       if !skipCollectors {
+               options["projectConnections"] = connections
+       }
+
+       stage = append(stage, &coreModels.PipelineTask{
+               Plugin: p.Name(),
+               Subtasks: []string{
+                       tasks.TaskCheckTokenMeta.Name,
+               },
+               Options: options,
+       })
+       plan = append(plan, stage)
+       return plan, nil
+}
+
 func (p Org) MapProject(projectName string, scopes []plugin.Scope) 
(coreModels.PipelinePlan, errors.Error) {
        var plan coreModels.PipelinePlan
        var stage coreModels.PipelineStage
diff --git a/backend/plugins/org/tasks/check_token.go 
b/backend/plugins/org/tasks/check_token.go
new file mode 100644
index 000000000..d7b624bba
--- /dev/null
+++ b/backend/plugins/org/tasks/check_token.go
@@ -0,0 +1,76 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+       "fmt"
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/plugin"
+       "golang.org/x/sync/errgroup"
+)
+
+var TaskCheckTokenMeta = plugin.SubTaskMeta{
+       Name:             "checkTokens",
+       EntryPoint:       checkProjectTokens,
+       EnabledByDefault: true,
+       Description:      "set project mapping",
+       DomainTypes:      []string{plugin.DOMAIN_TYPE_CROSS},
+}
+
+func checkProjectTokens(taskCtx plugin.SubTaskContext) errors.Error {
+       logger := taskCtx.GetLogger()
+       taskData := taskCtx.GetData().(*TaskData)
+       connections := taskData.Options.ProjectConnections
+       logger.Debug("connections %+v", connections)
+       if len(connections) == 0 {
+               return nil
+       }
+
+       g := new(errgroup.Group)
+       for _, connection := range connections {
+               conn := connection
+               logger.Debug("check conn: %+v", conn)
+               g.Go(func() error {
+                       if err := checkConnectionToken(conn.ConnectionId, 
conn.PluginName); err != nil {
+                               return err
+                       }
+                       return nil
+               })
+       }
+       if err := g.Wait(); err != nil {
+               return errors.Convert(err)
+       }
+       return nil
+}
+
+func checkConnectionToken(connectionID uint64, pluginName string) errors.Error 
{
+       pluginEntry, err := plugin.GetPlugin(pluginName)
+       if err != nil {
+               return err
+       }
+       if v, ok := pluginEntry.(plugin.PluginTestConnectionAPI); ok {
+               if err := v.TestConnection(connectionID); err != nil {
+                       return err
+               }
+               return nil
+       } else {
+               msg := fmt.Sprintf("plugin: %s doesn't impl test connection 
api", pluginName)
+               return errors.Default.New(msg)
+       }
+       return nil
+}
diff --git a/backend/plugins/org/tasks/task_data.go 
b/backend/plugins/org/tasks/task_data.go
index ca2a9b300..c92954740 100644
--- a/backend/plugins/org/tasks/task_data.go
+++ b/backend/plugins/org/tasks/task_data.go
@@ -20,8 +20,14 @@ package tasks
 import "github.com/apache/incubator-devlake/core/plugin"
 
 type Options struct {
-       ConnectionId    uint64           `json:"connectionId"`
-       ProjectMappings []ProjectMapping `json:"projectMappings"`
+       ConnectionId       uint64              `json:"connectionId"`
+       ProjectMappings    []ProjectMapping    `json:"projectMappings"`
+       ProjectConnections []ProjectConnection `json:"projectConnections"`
+}
+
+type ProjectConnection struct {
+       PluginName   string
+       ConnectionId uint64
 }
 
 // ProjectMapping represents the relations between project and scopes
diff --git a/backend/plugins/pagerduty/impl/impl.go 
b/backend/plugins/pagerduty/impl/impl.go
index b237b30e4..827df8701 100644
--- a/backend/plugins/pagerduty/impl/impl.go
+++ b/backend/plugins/pagerduty/impl/impl.go
@@ -138,6 +138,11 @@ func (p PagerDuty) MigrationScripts() 
[]plugin.MigrationScript {
        return migrationscripts.All()
 }
 
+func (p PagerDuty) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p PagerDuty) ApiResources() 
map[string]map[string]plugin.ApiResourceHandler {
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/refdiff/impl/impl.go 
b/backend/plugins/refdiff/impl/impl.go
index b5572f643..e660b1424 100644
--- a/backend/plugins/refdiff/impl/impl.go
+++ b/backend/plugins/refdiff/impl/impl.go
@@ -110,3 +110,7 @@ func (p RefDiff) RootPkgPath() string {
 func (p RefDiff) ApiResources() 
map[string]map[string]plugin.ApiResourceHandler {
        return nil
 }
+
+func (p RefDiff) TestConnection(id uint64) errors.Error {
+       return nil
+}
diff --git a/backend/plugins/slack/impl/impl.go 
b/backend/plugins/slack/impl/impl.go
index 8b3de9142..9ac894373 100644
--- a/backend/plugins/slack/impl/impl.go
+++ b/backend/plugins/slack/impl/impl.go
@@ -131,6 +131,11 @@ func (p Slack) MigrationScripts() []plugin.MigrationScript 
{
        return migrationscripts.All()
 }
 
+func (p Slack) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Slack) ApiResources() map[string]map[string]plugin.ApiResourceHandler {
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/sonarqube/impl/impl.go 
b/backend/plugins/sonarqube/impl/impl.go
index b6d214032..f0da89812 100644
--- a/backend/plugins/sonarqube/impl/impl.go
+++ b/backend/plugins/sonarqube/impl/impl.go
@@ -170,6 +170,11 @@ func (p Sonarqube) MigrationScripts() 
[]plugin.MigrationScript {
        return migrationscripts.All()
 }
 
+func (p Sonarqube) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Sonarqube) ApiResources() 
map[string]map[string]plugin.ApiResourceHandler {
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/starrocks/impl/impl.go 
b/backend/plugins/starrocks/impl/impl.go
index 511b8c7ec..1e4f15827 100644
--- a/backend/plugins/starrocks/impl/impl.go
+++ b/backend/plugins/starrocks/impl/impl.go
@@ -67,3 +67,7 @@ func (s StarRocks) Name() string {
 func (s StarRocks) RootPkgPath() string {
        return "github.com/merico-dev/lake/plugins/starrocks"
 }
+
+func (p StarRocks) TestConnection(id uint64) errors.Error {
+       return nil
+}
diff --git a/backend/plugins/tapd/impl/impl.go 
b/backend/plugins/tapd/impl/impl.go
index 2885a4337..2e133d821 100644
--- a/backend/plugins/tapd/impl/impl.go
+++ b/backend/plugins/tapd/impl/impl.go
@@ -265,6 +265,11 @@ func (p Tapd) MigrationScripts() []plugin.MigrationScript {
        return migrationscripts.All()
 }
 
+func (p Tapd) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Tapd) ApiResources() map[string]map[string]plugin.ApiResourceHandler {
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/teambition/impl/impl.go 
b/backend/plugins/teambition/impl/impl.go
index 662cace0f..728602d86 100644
--- a/backend/plugins/teambition/impl/impl.go
+++ b/backend/plugins/teambition/impl/impl.go
@@ -169,6 +169,11 @@ func (p Teambition) MigrationScripts() 
[]plugin.MigrationScript {
        return migrationscripts.All()
 }
 
+func (p Teambition) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Teambition) ApiResources() 
map[string]map[string]plugin.ApiResourceHandler {
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/trello/impl/impl.go 
b/backend/plugins/trello/impl/impl.go
index b3a0f088d..fdcb10fdc 100644
--- a/backend/plugins/trello/impl/impl.go
+++ b/backend/plugins/trello/impl/impl.go
@@ -151,6 +151,11 @@ func (p Trello) ScopeConfig() dal.Tabler {
        return &models.TrelloScopeConfig{}
 }
 
+func (p Trello) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Trello) ApiResources() map[string]map[string]plugin.ApiResourceHandler 
{
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/plugins/webhook/impl/impl.go 
b/backend/plugins/webhook/impl/impl.go
index ada304783..90e1f0198 100644
--- a/backend/plugins/webhook/impl/impl.go
+++ b/backend/plugins/webhook/impl/impl.go
@@ -77,6 +77,10 @@ func (p Webhook) MigrationScripts() []plugin.MigrationScript 
{
        return migrationscripts.All()
 }
 
+func (p Webhook) TestConnection(id uint64) errors.Error {
+       return nil
+}
+
 func (p Webhook) ApiResources() 
map[string]map[string]plugin.ApiResourceHandler {
        return map[string]map[string]plugin.ApiResourceHandler{
                "connections": {
diff --git a/backend/plugins/zentao/impl/impl.go 
b/backend/plugins/zentao/impl/impl.go
index 5aa74c966..55a98165d 100644
--- a/backend/plugins/zentao/impl/impl.go
+++ b/backend/plugins/zentao/impl/impl.go
@@ -277,6 +277,11 @@ func (p Zentao) MigrationScripts() 
[]plugin.MigrationScript {
        return migrationscripts.All()
 }
 
+func (p Zentao) TestConnection(id uint64) errors.Error {
+       _, err := 
api.TestExistingConnection(helper.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p Zentao) ApiResources() map[string]map[string]plugin.ApiResourceHandler 
{
        return map[string]map[string]plugin.ApiResourceHandler{
                "test": {
diff --git a/backend/server/services/blueprint.go 
b/backend/server/services/blueprint.go
index d400abae2..01395bb24 100644
--- a/backend/server/services/blueprint.go
+++ b/backend/server/services/blueprint.go
@@ -20,6 +20,9 @@ package services
 import (
        "encoding/json"
        "fmt"
+       "github.com/apache/incubator-devlake/core/log"
+       "github.com/apache/incubator-devlake/core/plugin"
+       "golang.org/x/sync/errgroup"
        "strings"
        "sync"
 
@@ -412,6 +415,11 @@ func TriggerBlueprint(id uint64, triggerSyncPolicy 
*models.TriggerSyncPolicy, sh
        }
        blueprint.SkipCollectors = triggerSyncPolicy.SkipCollectors
        blueprint.FullSync = triggerSyncPolicy.FullSync
+
+       if err := checkBlueprintTokens(blueprint, triggerSyncPolicy); err != 
nil {
+               return nil, errors.Default.Wrap(err, "check blue print tokens")
+       }
+
        pipeline, err := createPipelineByBlueprint(blueprint, 
&models.SyncPolicy{
                SkipOnFail:        false,
                TimeAfter:         nil,
@@ -427,3 +435,70 @@ func TriggerBlueprint(id uint64, triggerSyncPolicy 
*models.TriggerSyncPolicy, sh
        }
        return pipeline, nil
 }
+
+func needToCheckToken(triggerSyncPolicy *models.TriggerSyncPolicy) bool {
+       // case1: retransform
+       if triggerSyncPolicy.SkipCollectors && triggerSyncPolicy.FullSync == 
false {
+               return false
+       }
+       // case2: collect data
+       if triggerSyncPolicy.SkipCollectors == false && 
triggerSyncPolicy.FullSync == false {
+               return true
+       }
+       // case3: collect data with fullsync
+       if triggerSyncPolicy.SkipCollectors == false && 
triggerSyncPolicy.FullSync == true {
+               return true
+       }
+       // case4: others
+       return true
+}
+
+func checkBlueprintTokens(blueprint *models.Blueprint, triggerSyncPolicy 
*models.TriggerSyncPolicy) errors.Error {
+       if blueprint == nil {
+               return errors.Default.New("blueprint is nil")
+       }
+       if triggerSyncPolicy == nil {
+               return errors.Default.New("triggerSyncPolicy is nil")
+       }
+
+       if !needToCheckToken(triggerSyncPolicy) {
+               return nil
+       }
+
+       if len(blueprint.Connections) == 0 {
+               return nil
+       }
+
+       g := new(errgroup.Group)
+       for _, connection := range blueprint.Connections {
+               conn := *connection
+               g.Go(func() error {
+                       if err := checkConnectionToken(logger, conn); err != 
nil {
+                               return err
+                       }
+                       return nil
+               })
+       }
+       if err := g.Wait(); err != nil {
+               return errors.Convert(err)
+       }
+
+       return nil
+}
+
+func checkConnectionToken(logger log.Logger, connection 
models.BlueprintConnection) errors.Error {
+       pluginEntry, err := plugin.GetPlugin(connection.PluginName)
+       if err != nil {
+               return err
+       }
+       if v, ok := pluginEntry.(plugin.PluginTestConnectionAPI); ok {
+               if err := v.TestConnection(connection.ConnectionId); err != nil 
{
+                       logger.Error(err, "plugin: %s, id: %d", 
connection.PluginName, connection.ConnectionId)
+                       return err
+               }
+               return nil
+       } else {
+               msg := fmt.Sprintf("plugin: %s doesn't impl test connection 
api", connection.PluginName)
+               return errors.Default.New(msg)
+       }
+}
diff --git a/backend/server/services/blueprint_makeplan_v200.go 
b/backend/server/services/blueprint_makeplan_v200.go
index 727e843de..960465069 100644
--- a/backend/server/services/blueprint_makeplan_v200.go
+++ b/backend/server/services/blueprint_makeplan_v200.go
@@ -121,11 +121,25 @@ func GeneratePlanJsonV200(
                }
        }
        var planForProjectMapping coreModels.PipelinePlan
+       var planForProjectTokenChecker coreModels.PipelinePlan
        if projectName != "" {
                p, err := plugin.GetPlugin("org")
                if err != nil {
                        return nil, errors.Default.Wrap(err, "get plugin org")
                }
+               if pluginBp, ok := p.(plugin.ProjectTokenChecker); ok {
+                       var simpleConns []plugin.ProjectTokenCheckerConnection
+                       for _, connection := range connections {
+                               simpleConns = append(simpleConns, 
plugin.ProjectTokenCheckerConnection{
+                                       PluginName:   connection.PluginName,
+                                       ConnectionId: connection.ConnectionId,
+                               })
+                       }
+                       planForProjectTokenChecker, err = 
pluginBp.MakePipeline(skipCollectors, projectName, simpleConns)
+                       if err != nil {
+                               return nil, errors.Default.Wrap(err, "org token 
checker make pipeline")
+                       }
+               }
                if pluginBp, ok := p.(plugin.ProjectMapper); ok {
                        planForProjectMapping, err = 
pluginBp.MapProject(projectName, scopes)
                        if err != nil {
@@ -134,6 +148,7 @@ func GeneratePlanJsonV200(
                }
        }
        plan := SequentializePipelinePlans(
+               planForProjectTokenChecker,
                planForProjectMapping,
                ParallelizePipelinePlans(sourcePlans...),
                ParallelizePipelinePlans(metricPlans...),
diff --git a/backend/server/services/project.go 
b/backend/server/services/project.go
index 8e9528d38..990dbae2e 100644
--- a/backend/server/services/project.go
+++ b/backend/server/services/project.go
@@ -217,7 +217,7 @@ func PatchProject(name string, body map[string]interface{}) 
(*models.ApiOutputPr
                return nil, err
        }
 
-       // allowed to changed the name
+       // allowed to change the name
        if projectInput.Name == "" {
                projectInput.Name = name
        }
diff --git a/backend/server/services/remote/models/plugin_remote.go 
b/backend/server/services/remote/models/plugin_remote.go
index 988283630..3c97d8021 100644
--- a/backend/server/services/remote/models/plugin_remote.go
+++ b/backend/server/services/remote/models/plugin_remote.go
@@ -30,4 +30,5 @@ type RemotePlugin interface {
        plugin.PluginModel
        plugin.PluginMigration
        plugin.PluginSource
+       plugin.PluginTestConnectionAPI
 }
diff --git a/backend/server/services/remote/plugin/plugin_impl.go 
b/backend/server/services/remote/plugin/plugin_impl.go
index 6242b1600..ea7e912c5 100644
--- a/backend/server/services/remote/plugin/plugin_impl.go
+++ b/backend/server/services/remote/plugin/plugin_impl.go
@@ -230,6 +230,11 @@ func (p *remotePluginImpl) ApiResources() 
map[string]map[string]plugin.ApiResour
        return p.resources
 }
 
+func (p *remotePluginImpl) TestConnection(id uint64) errors.Error {
+       _, err := 
p.resources["connections/:connectionId/test"]["POST"](api.GenerateTestingConnectionApiResourceInput(id))
+       return err
+}
+
 func (p *remotePluginImpl) OpenApiSpec() string {
        return p.openApiSpec
 }

Reply via email to