This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch feat#5841-3
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/feat#5841-3 by this push:
new 61a02dc5c feat: allow users to configure the sync modes for plugins
that support incremental sync.
61a02dc5c is described below
commit 61a02dc5c1b9756c38254c07c7505d74cda10f9f
Author: abeizn <[email protected]>
AuthorDate: Fri Sep 8 11:37:36 2023 +0800
feat: allow users to configure the sync modes for plugins that support
incremental sync.
---
backend/core/models/blueprint.go | 9 ++--
.../migrationscripts/20230907_add_full_sync.go | 56 ++++++++++++++++++++++
backend/core/models/migrationscripts/register.go | 1 +
backend/core/models/pipeline.go | 2 +
backend/core/plugin/plugin_blueprint.go | 2 +-
backend/core/plugin/plugin_task.go | 3 ++
backend/core/runner/directrun.go | 1 +
backend/core/runner/run_task.go | 15 ++++++
backend/helpers/pluginhelper/api/api_collector.go | 16 ++++++-
.../pluginhelper/api/api_collector_with_state.go | 4 ++
.../helpers/pluginhelper/api/graphql_collector.go | 7 ++-
backend/helpers/unithelper/dummy_subtaskcontext.go | 3 ++
backend/impls/context/default_task_context.go | 14 +++++-
backend/plugins/bamboo/api/blueprint_V200_test.go | 2 +-
backend/plugins/bamboo/api/blueprint_v200.go | 4 +-
backend/plugins/bamboo/impl/impl.go | 4 +-
.../plugins/bitbucket/api/blueprint_V200_test.go | 2 +-
backend/plugins/bitbucket/api/blueprint_v200.go | 4 +-
backend/plugins/bitbucket/impl/impl.go | 4 +-
backend/plugins/github/api/blueprint_V200_test.go | 2 +-
backend/plugins/github/api/blueprint_v200.go | 4 +-
backend/plugins/github/impl/impl.go | 4 +-
backend/plugins/gitlab/api/blueprint_V200_test.go | 2 +-
backend/plugins/gitlab/api/blueprint_v200.go | 4 +-
backend/plugins/gitlab/impl/impl.go | 4 +-
backend/plugins/jenkins/api/blueprint_v200.go | 4 +-
backend/plugins/jenkins/api/blueprint_v200_test.go | 2 +-
backend/plugins/jenkins/impl/impl.go | 4 +-
backend/plugins/jira/api/blueprint_v200.go | 4 +-
backend/plugins/jira/api/blueprint_v200_test.go | 2 +-
backend/plugins/jira/impl/impl.go | 4 +-
backend/plugins/pagerduty/api/blueprint_v200.go | 4 +-
backend/plugins/pagerduty/impl/impl.go | 4 +-
backend/plugins/sonarqube/api/blueprint_v200.go | 4 +-
.../plugins/sonarqube/api/blueprint_v200_test.go | 2 +-
backend/plugins/sonarqube/impl/impl.go | 4 +-
backend/plugins/tapd/api/blueprint_v200.go | 4 +-
backend/plugins/tapd/api/blueprint_v200_test.go | 2 +-
backend/plugins/tapd/impl/impl.go | 4 +-
backend/plugins/teambition/api/blueprint200.go | 4 +-
backend/plugins/teambition/impl/impl.go | 4 +-
backend/plugins/trello/api/blueprint_v200.go | 4 +-
backend/plugins/trello/impl/impl.go | 4 +-
backend/plugins/webhook/impl/impl.go | 2 +-
backend/plugins/zentao/api/blueprint_V200_test.go | 2 +-
backend/plugins/zentao/api/blueprint_v200.go | 4 +-
backend/plugins/zentao/impl/impl.go | 4 +-
backend/server/api/blueprints/blueprints.go | 13 +++--
backend/server/services/blueprint.go | 41 ++++++++++++----
backend/server/services/blueprint_makeplan_v200.go | 5 +-
.../services/blueprint_makeplan_v200_test.go | 4 +-
backend/server/services/pipeline_helper.go | 1 +
.../services/remote/plugin/plugin_extensions.go | 2 +-
backend/test/helper/client.go | 1 +
54 files changed, 227 insertions(+), 89 deletions(-)
diff --git a/backend/core/models/blueprint.go b/backend/core/models/blueprint.go
index 19d5efdf4..371b97210 100644
--- a/backend/core/models/blueprint.go
+++ b/backend/core/models/blueprint.go
@@ -41,6 +41,7 @@ type Blueprint struct {
BeforePlan PipelinePlan `json:"beforePlan"
gorm:"serializer:encdec"`
AfterPlan PipelinePlan `json:"afterPlan"
gorm:"serializer:encdec"`
TimeAfter *time.Time `json:"timeAfter"`
+ FullSync bool `json:"fullSync"`
Labels []string `json:"labels" gorm:"-"`
Connections []*BlueprintConnection `json:"connections" gorm:"-"`
common.Model `swaggerignore:"true"`
@@ -83,7 +84,9 @@ func (BlueprintScope) TableName() string {
return "_devlake_blueprint_scopes"
}
-type BlueprintSyncPolicy struct {
- SkipOnFail bool `json:"skipOnFail"`
- TimeAfter *time.Time `json:"timeAfter"`
+type SyncPolicy struct {
+ SkipOnFail bool `json:"skipOnFail"`
+ FullSync bool `json:"fullSync"`
+ SkipCollectors bool `json:"skipCollectors"`
+ TimeAfter *time.Time `json:"timeAfter"`
}
diff --git a/backend/core/models/migrationscripts/20230907_add_full_sync.go
b/backend/core/models/migrationscripts/20230907_add_full_sync.go
new file mode 100644
index 000000000..403e68892
--- /dev/null
+++ b/backend/core/models/migrationscripts/20230907_add_full_sync.go
@@ -0,0 +1,56 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+ "github.com/apache/incubator-devlake/core/context"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/helpers/migrationhelper"
+)
+
+type addFullSyncToBlueprint struct {
+ FullSync bool `json:"fullSync"`
+}
+
+func (*addFullSyncToBlueprint) TableName() string {
+ return "_devlake_blueprints"
+}
+
+type addFullSyncToPipeline struct {
+ FullSync bool `json:"fullSync"`
+}
+
+func (*addFullSyncToPipeline) TableName() string {
+ return "_devlake_pipelines"
+}
+
+type addFullSync struct{}
+
+func (*addFullSync) Up(basicRes context.BasicRes) errors.Error {
+ return migrationhelper.AutoMigrateTables(
+ basicRes,
+ &addFullSyncToBlueprint{},
+ &addFullSyncToPipeline{},
+ )
+}
+
+func (*addFullSync) Version() uint64 {
+ return 20230907000041
+}
+
+func (*addFullSync) Name() string {
+ return "add full_sync to _devlake_blueprints and _devlake_pipelines
table"
+}
diff --git a/backend/core/models/migrationscripts/register.go
b/backend/core/models/migrationscripts/register.go
index 2f372318c..c3afbeaba 100644
--- a/backend/core/models/migrationscripts/register.go
+++ b/backend/core/models/migrationscripts/register.go
@@ -92,5 +92,6 @@ func All() []plugin.MigrationScript {
new(modifyCicdPipelinesToText),
new(dropTapStateTable),
new(normalizeBpSettings),
+ new(addFullSync),
}
}
diff --git a/backend/core/models/pipeline.go b/backend/core/models/pipeline.go
index 92824b8a1..1bbb21086 100644
--- a/backend/core/models/pipeline.go
+++ b/backend/core/models/pipeline.go
@@ -70,6 +70,7 @@ type Pipeline struct {
Stage int `json:"stage"`
Labels []string `json:"labels" gorm:"-"`
SkipOnFail bool `json:"skipOnFail"`
+ FullSync bool `json:"fullSync"`
}
// We use a 2D array because the request body must be an array of a set of
tasks
@@ -79,6 +80,7 @@ type NewPipeline struct {
Plan PipelinePlan `json:"plan" swaggertype:"array,string"
example:"please check api /pipelines/<PLUGIN_NAME>/pipeline-plan"`
Labels []string `json:"labels"`
SkipOnFail bool `json:"skipOnFail"`
+ FullSync bool `json:"fullSync"`
BlueprintId uint64
}
diff --git a/backend/core/plugin/plugin_blueprint.go
b/backend/core/plugin/plugin_blueprint.go
index 6831f90f0..db3412540 100644
--- a/backend/core/plugin/plugin_blueprint.go
+++ b/backend/core/plugin/plugin_blueprint.go
@@ -72,7 +72,7 @@ type DataSourcePluginBlueprintV200 interface {
MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*models.BlueprintScope,
- syncPolicy models.BlueprintSyncPolicy,
+ syncPolicy *models.SyncPolicy,
) (models.PipelinePlan, []Scope, errors.Error)
}
diff --git a/backend/core/plugin/plugin_task.go
b/backend/core/plugin/plugin_task.go
index d7dbc14bc..3bb44b5bb 100644
--- a/backend/core/plugin/plugin_task.go
+++ b/backend/core/plugin/plugin_task.go
@@ -22,6 +22,7 @@ import (
corecontext "github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/models"
)
type ProgressType int
@@ -62,6 +63,8 @@ type SubTaskContext interface {
type TaskContext interface {
ExecContext
SetData(data interface{})
+ SetSyncPolicy(syncPolicy *models.SyncPolicy)
+ SyncPolicy() *models.SyncPolicy
SubTaskContext(subtask string) (SubTaskContext, errors.Error)
}
diff --git a/backend/core/runner/directrun.go b/backend/core/runner/directrun.go
index cb6935b9e..7578d8f39 100644
--- a/backend/core/runner/directrun.go
+++ b/backend/core/runner/directrun.go
@@ -90,6 +90,7 @@ func DirectRun(cmd *cobra.Command, args []string, pluginTask
plugin.PluginTask,
task,
pluginTask,
nil,
+ nil,
)
if err != nil {
panic(err)
diff --git a/backend/core/runner/run_task.go b/backend/core/runner/run_task.go
index c0f80fa9f..2caf42370 100644
--- a/backend/core/runner/run_task.go
+++ b/backend/core/runner/run_task.go
@@ -53,6 +53,16 @@ func RunTask(
if err := db.First(dbPipeline, dal.Where("id = ? ", task.PipelineId));
err != nil {
return err
}
+ blueprint := &models.Blueprint{}
+ if err := db.First(blueprint, dal.Where("id = ? ",
dbPipeline.BlueprintId)); err != nil {
+ return err
+ }
+ syncPolicy := &models.SyncPolicy{}
+ syncPolicy.TimeAfter = blueprint.TimeAfter
+ // Prioritize the configuration of pipeline
+ syncPolicy.SkipOnFail = dbPipeline.SkipOnFail
+ syncPolicy.FullSync = dbPipeline.FullSync
+
logger, err := getTaskLogger(basicRes.GetLogger(), task)
if err != nil {
return err
@@ -136,6 +146,7 @@ func RunTask(
basicRes.ReplaceLogger(logger),
task,
progress,
+ syncPolicy,
)
return err
}
@@ -146,6 +157,7 @@ func RunPluginTask(
basicRes context.BasicRes,
task *models.Task,
progress chan plugin.RunningProgress,
+ syncPolicy *models.SyncPolicy,
) errors.Error {
pluginMeta, err := plugin.GetPlugin(task.Plugin)
if err != nil {
@@ -161,6 +173,7 @@ func RunPluginTask(
task,
pluginTask,
progress,
+ syncPolicy,
)
}
@@ -171,6 +184,7 @@ func RunPluginSubTasks(
task *models.Task,
pluginTask plugin.PluginTask,
progress chan plugin.RunningProgress,
+ syncPolicy *models.SyncPolicy,
) errors.Error {
logger := basicRes.GetLogger()
logger.Info("start plugin")
@@ -236,6 +250,7 @@ func RunPluginSubTasks(
if err != nil {
return errors.Default.Wrap(err, fmt.Sprintf("error preparing
task data for %s", task.Plugin))
}
+ taskCtx.SetSyncPolicy(syncPolicy)
taskCtx.SetData(taskData)
// execute subtasks in order
diff --git a/backend/helpers/pluginhelper/api/api_collector.go
b/backend/helpers/pluginhelper/api/api_collector.go
index 0cd4b1c4d..b9073c861 100644
--- a/backend/helpers/pluginhelper/api/api_collector.go
+++ b/backend/helpers/pluginhelper/api/api_collector.go
@@ -21,7 +21,6 @@ import (
"bytes"
"encoding/json"
"fmt"
- "github.com/apache/incubator-devlake/core/plugin"
"io"
"net/http"
"net/url"
@@ -29,6 +28,8 @@ import (
"text/template"
"time"
+ "github.com/apache/incubator-devlake/core/plugin"
+
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/helpers/pluginhelper/common"
@@ -160,8 +161,19 @@ func (collector *ApiCollector) Execute() errors.Error {
return errors.Default.Wrap(err, "error auto-migrating
collector")
}
+ fmt.Println("xxxxxxxx----------------------")
+ isIncremental := collector.args.Incremental
+ taskContext := collector.args.Ctx.TaskContext()
+ fmt.Println(taskContext.SyncPolicy().FullSync)
+ fmt.Println(taskContext.SyncPolicy().SkipCollectors)
+ fmt.Println(taskContext.SyncPolicy().TimeAfter)
+ fmt.Println(taskContext.SyncPolicy().SkipOnFail)
+ if taskContext.SyncPolicy().FullSync {
+ fmt.Println("full sync----------------------")
+ isIncremental = false
+ }
// flush data if not incremental collection
- if !collector.args.Incremental {
+ if !isIncremental {
err = db.Delete(&RawData{}, dal.From(collector.table),
dal.Where("params = ?", collector.params))
if err != nil {
return errors.Default.Wrap(err, "error deleting data
from collector")
diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index ecd080355..e2ae8f987 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -74,7 +74,11 @@ func (m *ApiCollectorStateManager) IsIncremental() bool {
prevSyncTime := m.LatestState.LatestSuccessStart
prevTimeAfter := m.LatestState.TimeAfter
currTimeAfter := m.TimeAfter
+ fullSync := m.Ctx.TaskContext().SyncPolicy().FullSync
+ if fullSync {
+ return false
+ }
if prevSyncTime == nil {
return false
}
diff --git a/backend/helpers/pluginhelper/api/graphql_collector.go
b/backend/helpers/pluginhelper/api/graphql_collector.go
index 0347d5154..3a4cc571f 100644
--- a/backend/helpers/pluginhelper/api/graphql_collector.go
+++ b/backend/helpers/pluginhelper/api/graphql_collector.go
@@ -138,8 +138,13 @@ func (collector *GraphqlCollector) Execute() errors.Error {
divider := NewBatchSaveDivider(collector.args.Ctx,
collector.args.BatchSize, collector.table, collector.params)
+ isIncremental := collector.args.Incremental
+ taskContext := collector.args.Ctx.TaskContext()
+ if taskContext.SyncPolicy().FullSync {
+ isIncremental = false
+ }
// flush data if not incremental collection
- if collector.args.Incremental {
+ if isIncremental {
// re extract data for new scope config
err = collector.ExtractExistRawData(divider)
if err != nil {
diff --git a/backend/helpers/unithelper/dummy_subtaskcontext.go
b/backend/helpers/unithelper/dummy_subtaskcontext.go
index 264050105..0fc966417 100644
--- a/backend/helpers/unithelper/dummy_subtaskcontext.go
+++ b/backend/helpers/unithelper/dummy_subtaskcontext.go
@@ -31,5 +31,8 @@ func DummySubTaskContext(db dal.Dal)
*mockplugin.SubTaskContext {
mockCtx.On("SetProgress", mock.Anything, mock.Anything)
mockCtx.On("IncProgress", mock.Anything, mock.Anything)
mockCtx.On("GetName").Return("test")
+ mockTaskContext := new(mockplugin.TaskContext)
+ mockTaskContext.On("SyncPolicy").Return(nil)
+ mockCtx.On("TaskContext").Return(mockTaskContext)
return mockCtx
}
diff --git a/backend/impls/context/default_task_context.go
b/backend/impls/context/default_task_context.go
index 1a6b79529..f6d67c909 100644
--- a/backend/impls/context/default_task_context.go
+++ b/backend/impls/context/default_task_context.go
@@ -20,10 +20,12 @@ package context
import (
gocontext "context"
"fmt"
+ "time"
+
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/plugin"
- "time"
)
// DefaultTaskContext is TaskContext default implementation
@@ -31,6 +33,7 @@ type DefaultTaskContext struct {
*defaultExecContext
subtasks map[string]bool
subtaskCtxs map[string]*DefaultSubTaskContext
+ syncPolicy *models.SyncPolicy
}
// SetProgress FIXME ...
@@ -45,6 +48,14 @@ func (c *DefaultTaskContext) IncProgress(quantity int) {
c.BasicRes.GetLogger().Info("finished step: %d / %d", c.current,
c.total)
}
+func (c *DefaultTaskContext) SetSyncPolicy(syncPolicy *models.SyncPolicy) {
+ c.syncPolicy = syncPolicy
+}
+
+func (c *DefaultTaskContext) SyncPolicy() *models.SyncPolicy {
+ return c.syncPolicy
+}
+
// SubTaskContext FIXME ...
func (c *DefaultTaskContext) SubTaskContext(subtask string)
(plugin.SubTaskContext, errors.Error) {
// no need to lock at this point because subtasks is written only once
@@ -86,6 +97,7 @@ func NewDefaultTaskContext(
newDefaultExecContext(ctx, basicRes, name, nil, progress),
subtasks,
make(map[string]*DefaultSubTaskContext),
+ nil,
}
}
diff --git a/backend/plugins/bamboo/api/blueprint_V200_test.go
b/backend/plugins/bamboo/api/blueprint_V200_test.go
index ee70a2cae..5fb5c8a86 100644
--- a/backend/plugins/bamboo/api/blueprint_V200_test.go
+++ b/backend/plugins/bamboo/api/blueprint_V200_test.go
@@ -48,7 +48,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
const testScopeConfigName string = "bamboo scope config"
const testProxy string = ""
- syncPolicy := &coreModels.BlueprintSyncPolicy{}
+ syncPolicy := &coreModels.SyncPolicy{}
bpScopes := []*coreModels.BlueprintScope{
{
ScopeId: testKey,
diff --git a/backend/plugins/bamboo/api/blueprint_v200.go
b/backend/plugins/bamboo/api/blueprint_v200.go
index 811db19ba..bbd2a3ae8 100644
--- a/backend/plugins/bamboo/api/blueprint_v200.go
+++ b/backend/plugins/bamboo/api/blueprint_v200.go
@@ -36,7 +36,7 @@ func MakePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
scope []*coreModels.BlueprintScope,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
var err errors.Error
connection := new(models.BambooConnection)
@@ -84,7 +84,7 @@ func makeScopeV200(connectionId uint64, scopes
[]*coreModels.BlueprintScope) ([]
func makePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
scopes []*coreModels.BlueprintScope,
- connection *models.BambooConnection, syncPolicy
*coreModels.BlueprintSyncPolicy,
+ connection *models.BambooConnection, syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
plans := make(coreModels.PipelinePlan, 0, len(scopes))
for _, scope := range scopes {
diff --git a/backend/plugins/bamboo/impl/impl.go
b/backend/plugins/bamboo/impl/impl.go
index cd31c2bbb..69d77b3f4 100644
--- a/backend/plugins/bamboo/impl/impl.go
+++ b/backend/plugins/bamboo/impl/impl.go
@@ -68,9 +68,9 @@ func (p Bamboo) ScopeConfig() dal.Tabler {
func (p Bamboo) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
- return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes,
&syncPolicy)
+ return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes,
syncPolicy)
}
func (p Bamboo) GetTablesInfo() []dal.Tabler {
diff --git a/backend/plugins/bitbucket/api/blueprint_V200_test.go
b/backend/plugins/bitbucket/api/blueprint_V200_test.go
index 5de59d64b..0a878c2e1 100644
--- a/backend/plugins/bitbucket/api/blueprint_V200_test.go
+++ b/backend/plugins/bitbucket/api/blueprint_V200_test.go
@@ -68,7 +68,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
}
bpScopes := make([]*coreModels.BlueprintScope, 0)
bpScopes = append(bpScopes, bs)
- syncPolicy := &coreModels.BlueprintSyncPolicy{}
+ syncPolicy := &coreModels.SyncPolicy{}
plan := make(coreModels.PipelinePlan, len(bpScopes))
plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes,
connection, syncPolicy)
diff --git a/backend/plugins/bitbucket/api/blueprint_v200.go
b/backend/plugins/bitbucket/api/blueprint_v200.go
index cfa165b99..e438a4a68 100644
--- a/backend/plugins/bitbucket/api/blueprint_v200.go
+++ b/backend/plugins/bitbucket/api/blueprint_v200.go
@@ -39,7 +39,7 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
// get the connection info for url
connection := &models.BitbucketConnection{}
@@ -66,7 +66,7 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connection *models.BitbucketConnection,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
stage := plan[i]
diff --git a/backend/plugins/bitbucket/impl/impl.go
b/backend/plugins/bitbucket/impl/impl.go
index ca5ef0836..275e97377 100644
--- a/backend/plugins/bitbucket/impl/impl.go
+++ b/backend/plugins/bitbucket/impl/impl.go
@@ -202,8 +202,8 @@ func (p Bitbucket) MigrationScripts()
[]plugin.MigrationScript {
func (p Bitbucket) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy coreModels.BlueprintSyncPolicy) (pp coreModels.PipelinePlan,
sc []plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, &syncPolicy)
+ syncPolicy *coreModels.SyncPolicy) (pp coreModels.PipelinePlan, sc
[]plugin.Scope, err errors.Error) {
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
}
func (p Bitbucket) ApiResources()
map[string]map[string]plugin.ApiResourceHandler {
diff --git a/backend/plugins/github/api/blueprint_V200_test.go
b/backend/plugins/github/api/blueprint_V200_test.go
index dd1712ec6..718bcb5b2 100644
--- a/backend/plugins/github/api/blueprint_V200_test.go
+++ b/backend/plugins/github/api/blueprint_V200_test.go
@@ -69,7 +69,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
}
bpScopes := make([]*coreModels.BlueprintScope, 0)
bpScopes = append(bpScopes, bs)
- syncPolicy := &coreModels.BlueprintSyncPolicy{}
+ syncPolicy := &coreModels.SyncPolicy{}
plan := make(coreModels.PipelinePlan, len(bpScopes))
plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes,
connection, syncPolicy)
diff --git a/backend/plugins/github/api/blueprint_v200.go
b/backend/plugins/github/api/blueprint_v200.go
index eaa62a8c4..e405e52c3 100644
--- a/backend/plugins/github/api/blueprint_v200.go
+++ b/backend/plugins/github/api/blueprint_v200.go
@@ -47,7 +47,7 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
// get the connection info for url
connection := &models.GithubConnection{}
@@ -81,7 +81,7 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connection *models.GithubConnection,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
stage := plan[i]
diff --git a/backend/plugins/github/impl/impl.go
b/backend/plugins/github/impl/impl.go
index 97cbe9364..12481fe0e 100644
--- a/backend/plugins/github/impl/impl.go
+++ b/backend/plugins/github/impl/impl.go
@@ -226,9 +226,9 @@ func (p Github) ApiResources()
map[string]map[string]plugin.ApiResourceHandler {
func (p Github) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, &syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
}
func (p Github) Close(taskCtx plugin.TaskContext) errors.Error {
diff --git a/backend/plugins/gitlab/api/blueprint_V200_test.go
b/backend/plugins/gitlab/api/blueprint_V200_test.go
index da2825aab..99bc27062 100644
--- a/backend/plugins/gitlab/api/blueprint_V200_test.go
+++ b/backend/plugins/gitlab/api/blueprint_V200_test.go
@@ -52,7 +52,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
const testScopeConfigName string = "gitlab scope config"
const testProxy string = ""
- syncPolicy := &coreModels.BlueprintSyncPolicy{}
+ syncPolicy := &coreModels.SyncPolicy{}
bpScopes := []*coreModels.BlueprintScope{
{
ScopeId: strconv.Itoa(testID),
diff --git a/backend/plugins/gitlab/api/blueprint_v200.go
b/backend/plugins/gitlab/api/blueprint_v200.go
index f4513951b..d7197cb5d 100644
--- a/backend/plugins/gitlab/api/blueprint_v200.go
+++ b/backend/plugins/gitlab/api/blueprint_v200.go
@@ -46,7 +46,7 @@ func MakePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
scope []*coreModels.BlueprintScope,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
var err errors.Error
connection := new(models.GitlabConnection)
@@ -114,7 +114,7 @@ func makeScopeV200(connectionId uint64, scopes
[]*coreModels.BlueprintScope) ([]
func makePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
scopes []*coreModels.BlueprintScope,
- connection *models.GitlabConnection, syncPolicy
*coreModels.BlueprintSyncPolicy,
+ connection *models.GitlabConnection, syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
plans := make(coreModels.PipelinePlan, 0, 3*len(scopes))
for _, scope := range scopes {
diff --git a/backend/plugins/gitlab/impl/impl.go
b/backend/plugins/gitlab/impl/impl.go
index 0a9008fa4..36c7c2ac2 100644
--- a/backend/plugins/gitlab/impl/impl.go
+++ b/backend/plugins/gitlab/impl/impl.go
@@ -77,9 +77,9 @@ func (p Gitlab) ScopeConfig() dal.Tabler {
func (p Gitlab) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
- return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes,
&syncPolicy)
+ return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes,
syncPolicy)
}
func (p Gitlab) GetTablesInfo() []dal.Tabler {
diff --git a/backend/plugins/jenkins/api/blueprint_v200.go
b/backend/plugins/jenkins/api/blueprint_v200.go
index 00ba31fd1..7dca09a03 100644
--- a/backend/plugins/jenkins/api/blueprint_v200.go
+++ b/backend/plugins/jenkins/api/blueprint_v200.go
@@ -35,7 +35,7 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
plan := make(coreModels.PipelinePlan, len(bpScopes))
plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId, syncPolicy)
@@ -55,7 +55,7 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connectionId uint64,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
stage := plan[i]
diff --git a/backend/plugins/jenkins/api/blueprint_v200_test.go
b/backend/plugins/jenkins/api/blueprint_v200_test.go
index 463662365..bfd61f573 100644
--- a/backend/plugins/jenkins/api/blueprint_v200_test.go
+++ b/backend/plugins/jenkins/api/blueprint_v200_test.go
@@ -42,7 +42,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
bs := &coreModels.BlueprintScope{
ScopeId: "a/b/ccc",
}
- syncPolicy := &coreModels.BlueprintSyncPolicy{}
+ syncPolicy := &coreModels.SyncPolicy{}
bpScopes := make([]*coreModels.BlueprintScope, 0)
bpScopes = append(bpScopes, bs)
diff --git a/backend/plugins/jenkins/impl/impl.go
b/backend/plugins/jenkins/impl/impl.go
index 123e7ddb5..01970c5dc 100644
--- a/backend/plugins/jenkins/impl/impl.go
+++ b/backend/plugins/jenkins/impl/impl.go
@@ -173,9 +173,9 @@ func (p Jenkins) MigrationScripts()
[]plugin.MigrationScript {
func (p Jenkins) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, &syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
}
func (p Jenkins) ApiResources()
map[string]map[string]plugin.ApiResourceHandler {
diff --git a/backend/plugins/jira/api/blueprint_v200.go
b/backend/plugins/jira/api/blueprint_v200.go
index 737c77d33..20be17970 100644
--- a/backend/plugins/jira/api/blueprint_v200.go
+++ b/backend/plugins/jira/api/blueprint_v200.go
@@ -35,7 +35,7 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
plan := make(coreModels.PipelinePlan, len(bpScopes))
plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId, syncPolicy)
@@ -55,7 +55,7 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connectionId uint64,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
stage := plan[i]
diff --git a/backend/plugins/jira/api/blueprint_v200_test.go
b/backend/plugins/jira/api/blueprint_v200_test.go
index e8e64fe17..76b9306f6 100644
--- a/backend/plugins/jira/api/blueprint_v200_test.go
+++ b/backend/plugins/jira/api/blueprint_v200_test.go
@@ -42,7 +42,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
bs := &coreModels.BlueprintScope{
ScopeId: "10",
}
- syncPolicy := &coreModels.BlueprintSyncPolicy{}
+ syncPolicy := &coreModels.SyncPolicy{}
bpScopes := make([]*coreModels.BlueprintScope, 0)
bpScopes = append(bpScopes, bs)
plan := make(coreModels.PipelinePlan, len(bpScopes))
diff --git a/backend/plugins/jira/impl/impl.go
b/backend/plugins/jira/impl/impl.go
index f74d71986..432a4142a 100644
--- a/backend/plugins/jira/impl/impl.go
+++ b/backend/plugins/jira/impl/impl.go
@@ -265,9 +265,9 @@ func (p Jira) PrepareTaskData(taskCtx plugin.TaskContext,
options map[string]int
func (p Jira) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, &syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
}
func (p Jira) RootPkgPath() string {
diff --git a/backend/plugins/pagerduty/api/blueprint_v200.go
b/backend/plugins/pagerduty/api/blueprint_v200.go
index 5087a1ff0..c213e4755 100644
--- a/backend/plugins/pagerduty/api/blueprint_v200.go
+++ b/backend/plugins/pagerduty/api/blueprint_v200.go
@@ -36,7 +36,7 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
// get the connection info for url
connection := &models.PagerDutyConnection{}
@@ -63,7 +63,7 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connection *models.PagerDutyConnection,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
// get board and scope config from db
diff --git a/backend/plugins/pagerduty/impl/impl.go
b/backend/plugins/pagerduty/impl/impl.go
index 0314ddb9a..14737971d 100644
--- a/backend/plugins/pagerduty/impl/impl.go
+++ b/backend/plugins/pagerduty/impl/impl.go
@@ -177,9 +177,9 @@ func (p PagerDuty) ApiResources()
map[string]map[string]plugin.ApiResourceHandle
func (p PagerDuty) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, &syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
}
func (p PagerDuty) Close(taskCtx plugin.TaskContext) errors.Error {
diff --git a/backend/plugins/sonarqube/api/blueprint_v200.go
b/backend/plugins/sonarqube/api/blueprint_v200.go
index fa4825ce3..95f886c20 100644
--- a/backend/plugins/sonarqube/api/blueprint_v200.go
+++ b/backend/plugins/sonarqube/api/blueprint_v200.go
@@ -38,7 +38,7 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
plan := make(coreModels.PipelinePlan, len(bpScopes))
plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId, syncPolicy)
@@ -58,7 +58,7 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connectionId uint64,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
stage := plan[i]
diff --git a/backend/plugins/sonarqube/api/blueprint_v200_test.go
b/backend/plugins/sonarqube/api/blueprint_v200_test.go
index f9d9f760e..fd1937119 100644
--- a/backend/plugins/sonarqube/api/blueprint_v200_test.go
+++ b/backend/plugins/sonarqube/api/blueprint_v200_test.go
@@ -41,7 +41,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
bs := &coreModels.BlueprintScope{
ScopeId: "f5a50c63-2e8f-4107-9014-853f6f467757",
}
- syncPolicy := &coreModels.BlueprintSyncPolicy{}
+ syncPolicy := &coreModels.SyncPolicy{}
bpScopes := make([]*coreModels.BlueprintScope, 0)
bpScopes = append(bpScopes, bs)
plan := make(coreModels.PipelinePlan, len(bpScopes))
diff --git a/backend/plugins/sonarqube/impl/impl.go
b/backend/plugins/sonarqube/impl/impl.go
index 5ec0919df..ac2743104 100644
--- a/backend/plugins/sonarqube/impl/impl.go
+++ b/backend/plugins/sonarqube/impl/impl.go
@@ -200,9 +200,9 @@ func (p Sonarqube) ApiResources()
map[string]map[string]plugin.ApiResourceHandle
func (p Sonarqube) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, &syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
}
func (p Sonarqube) Close(taskCtx plugin.TaskContext) errors.Error {
diff --git a/backend/plugins/tapd/api/blueprint_v200.go
b/backend/plugins/tapd/api/blueprint_v200.go
index 82fe90214..41aa2cfa4 100644
--- a/backend/plugins/tapd/api/blueprint_v200.go
+++ b/backend/plugins/tapd/api/blueprint_v200.go
@@ -36,7 +36,7 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
plan := make(coreModels.PipelinePlan, len(bpScopes))
plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId, syncPolicy)
@@ -56,7 +56,7 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connectionId uint64,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
stage := plan[i]
diff --git a/backend/plugins/tapd/api/blueprint_v200_test.go
b/backend/plugins/tapd/api/blueprint_v200_test.go
index c7aa2647b..dac1d9a43 100644
--- a/backend/plugins/tapd/api/blueprint_v200_test.go
+++ b/backend/plugins/tapd/api/blueprint_v200_test.go
@@ -42,7 +42,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
bs := &coreModels.BlueprintScope{
ScopeId: "10",
}
- syncPolicy := &coreModels.BlueprintSyncPolicy{}
+ syncPolicy := &coreModels.SyncPolicy{}
bpScopes := make([]*coreModels.BlueprintScope, 0)
bpScopes = append(bpScopes, bs)
plan := make(coreModels.PipelinePlan, len(bpScopes))
diff --git a/backend/plugins/tapd/impl/impl.go
b/backend/plugins/tapd/impl/impl.go
index bff9aa5fa..f72a81a4c 100644
--- a/backend/plugins/tapd/impl/impl.go
+++ b/backend/plugins/tapd/impl/impl.go
@@ -270,9 +270,9 @@ func (p Tapd) PrepareTaskData(taskCtx plugin.TaskContext,
options map[string]int
func (p Tapd) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, &syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
}
func (p Tapd) RootPkgPath() string {
diff --git a/backend/plugins/teambition/api/blueprint200.go
b/backend/plugins/teambition/api/blueprint200.go
index 52e8fbb92..6ee599725 100644
--- a/backend/plugins/teambition/api/blueprint200.go
+++ b/backend/plugins/teambition/api/blueprint200.go
@@ -37,7 +37,7 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
plan := make(coreModels.PipelinePlan, len(bpScopes))
plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId, syncPolicy)
@@ -57,7 +57,7 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connectionId uint64,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
stage := plan[i]
diff --git a/backend/plugins/teambition/impl/impl.go
b/backend/plugins/teambition/impl/impl.go
index e499a0294..2aeb2f11a 100644
--- a/backend/plugins/teambition/impl/impl.go
+++ b/backend/plugins/teambition/impl/impl.go
@@ -122,9 +122,9 @@ func (p Teambition) SubTaskMetas() []plugin.SubTaskMeta {
func (p Teambition) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, &syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
}
func (p Teambition) PrepareTaskData(taskCtx plugin.TaskContext, options
map[string]interface{}) (interface{}, errors.Error) {
diff --git a/backend/plugins/trello/api/blueprint_v200.go
b/backend/plugins/trello/api/blueprint_v200.go
index d2b79329b..4329e37df 100644
--- a/backend/plugins/trello/api/blueprint_v200.go
+++ b/backend/plugins/trello/api/blueprint_v200.go
@@ -38,7 +38,7 @@ func MakePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
scope []*coreModels.BlueprintScope,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
scopes, err := makeScopeV200(connectionId, scope)
if err != nil {
@@ -81,7 +81,7 @@ func makePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
plan coreModels.PipelinePlan,
scopes []*coreModels.BlueprintScope,
- connectionId uint64, syncPolicy *coreModels.BlueprintSyncPolicy,
+ connectionId uint64, syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, scope := range scopes {
stage := plan[i]
diff --git a/backend/plugins/trello/impl/impl.go
b/backend/plugins/trello/impl/impl.go
index 532c69794..39b177a0a 100644
--- a/backend/plugins/trello/impl/impl.go
+++ b/backend/plugins/trello/impl/impl.go
@@ -195,9 +195,9 @@ func (p Trello) ApiResources()
map[string]map[string]plugin.ApiResourceHandler {
func (p Trello) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
- return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes,
&syncPolicy)
+ return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes,
syncPolicy)
}
func (p Trello) Close(taskCtx plugin.TaskContext) errors.Error {
diff --git a/backend/plugins/webhook/impl/impl.go
b/backend/plugins/webhook/impl/impl.go
index 46a01715e..fd56f3160 100644
--- a/backend/plugins/webhook/impl/impl.go
+++ b/backend/plugins/webhook/impl/impl.go
@@ -62,7 +62,7 @@ func (p Webhook) GetTablesInfo() []dal.Tabler {
func (p Webhook) MakeDataSourcePipelinePlanV200(
connectionId uint64,
_ []*coreModels.BlueprintScope,
- _ coreModels.BlueprintSyncPolicy,
+ _ coreModels.SyncPolicy,
) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
return api.MakeDataSourcePipelinePlanV200(connectionId)
}
diff --git a/backend/plugins/zentao/api/blueprint_V200_test.go
b/backend/plugins/zentao/api/blueprint_V200_test.go
index b5fe6b10f..b379b7e3d 100644
--- a/backend/plugins/zentao/api/blueprint_V200_test.go
+++ b/backend/plugins/zentao/api/blueprint_V200_test.go
@@ -71,7 +71,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
}*/
bpScopes := make([]*coreModels.BlueprintScope, 0)
bpScopes = append(bpScopes, bs)
- syncPolicy := &coreModels.BlueprintSyncPolicy{}
+ syncPolicy := &coreModels.SyncPolicy{}
plan := make(coreModels.PipelinePlan, len(bpScopes))
plan, scopes, err := makePipelinePlanV200(nil, plan, bpScopes,
connection, syncPolicy)
diff --git a/backend/plugins/zentao/api/blueprint_v200.go
b/backend/plugins/zentao/api/blueprint_v200.go
index 6d53f0b76..b316cff43 100644
--- a/backend/plugins/zentao/api/blueprint_v200.go
+++ b/backend/plugins/zentao/api/blueprint_v200.go
@@ -36,7 +36,7 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
// get the connection info for url
connection := &models.ZentaoConnection{}
@@ -59,7 +59,7 @@ func makePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connection *models.ZentaoConnection,
- syncPolicy *coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
domainScopes := make([]plugin.Scope, 0)
for i, bpScope := range bpScopes {
diff --git a/backend/plugins/zentao/impl/impl.go
b/backend/plugins/zentao/impl/impl.go
index a65ac6671..cbea91b34 100644
--- a/backend/plugins/zentao/impl/impl.go
+++ b/backend/plugins/zentao/impl/impl.go
@@ -283,9 +283,9 @@ func (p Zentao) ApiResources()
map[string]map[string]plugin.ApiResourceHandler {
func (p Zentao) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, &syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
}
func (p Zentao) Close(taskCtx plugin.TaskContext) errors.Error {
diff --git a/backend/server/api/blueprints/blueprints.go
b/backend/server/api/blueprints/blueprints.go
index ef2afc89c..389247abb 100644
--- a/backend/server/api/blueprints/blueprints.go
+++ b/backend/server/api/blueprints/blueprints.go
@@ -169,6 +169,7 @@ func Patch(c *gin.Context) {
// @Accept application/json
// @Param blueprintId path string true "blueprintId"
// @Param skipCollectors body bool false "skipCollectors"
+// @Param fullSync body bool false "fullSync"
// @Success 200 {object} models.Pipeline
// @Failure 400 {object} shared.ApiBody "Bad Request"
// @Failure 500 {object} shared.ApiBody "Internal Error"
@@ -181,21 +182,19 @@ func Trigger(c *gin.Context) {
return
}
- var body struct {
- SkipCollectors bool `json:"skipCollectors"`
- }
-
+ syncPolicy := &models.SyncPolicy{}
if c.Request.Body == nil || c.Request.ContentLength == 0 {
- body.SkipCollectors = false
+ syncPolicy.SkipCollectors = false
+ syncPolicy.FullSync = false
} else {
- err = c.ShouldBindJSON(&body)
+ err = c.ShouldBindJSON(syncPolicy)
if err != nil {
shared.ApiOutputError(c, errors.BadInput.Wrap(err,
"error binding request body"))
return
}
}
- pipeline, err := services.TriggerBlueprint(id, body.SkipCollectors)
+ pipeline, err := services.TriggerBlueprint(id, syncPolicy)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error
triggering blueprint"))
return
diff --git a/backend/server/services/blueprint.go
b/backend/server/services/blueprint.go
index b644a807d..29246b43f 100644
--- a/backend/server/services/blueprint.go
+++ b/backend/server/services/blueprint.go
@@ -53,7 +53,12 @@ type BlueprintJob struct {
func (bj BlueprintJob) Run() {
blueprint := bj.Blueprint
- pipeline, err := createPipelineByBlueprint(blueprint, false)
+ syncPolicy := &models.SyncPolicy{
+ TimeAfter: blueprint.TimeAfter,
+ FullSync: blueprint.FullSync,
+ SkipOnFail: blueprint.SkipOnFail,
+ }
+ pipeline, err := createPipelineByBlueprint(blueprint, syncPolicy)
if err == ErrEmptyPlan {
blueprintLog.Info("Empty plan, blueprint id:[%d] blueprint
name:[%s]", blueprint.ID, blueprint.Name)
return
@@ -161,8 +166,13 @@ func validateBlueprintAndMakePlan(blueprint
*models.Blueprint) errors.Error {
return errors.BadInput.New("invalid plan")
}
} else if blueprint.Mode == models.BLUEPRINT_MODE_NORMAL {
+ syncPolicy := &models.SyncPolicy{
+ TimeAfter: blueprint.TimeAfter,
+ FullSync: blueprint.FullSync,
+ SkipOnFail: blueprint.SkipOnFail,
+ }
var e errors.Error
- blueprint.Plan, e = MakePlanForBlueprint(blueprint, false)
+ blueprint.Plan, e = MakePlanForBlueprint(blueprint, syncPolicy)
if err != nil {
return e
}
@@ -261,11 +271,14 @@ func ReloadBlueprints(c *cron.Cron) errors.Error {
return nil
}
-func createPipelineByBlueprint(blueprint *models.Blueprint, skipCollectors
bool) (*models.Pipeline, errors.Error) {
+func createPipelineByBlueprint(blueprint *models.Blueprint, syncPolicy
*models.SyncPolicy) (*models.Pipeline, errors.Error) {
var plan models.PipelinePlan
var err errors.Error
+ if syncPolicy != nil && syncPolicy.FullSync {
+ blueprint.FullSync = true
+ }
if blueprint.Mode == models.BLUEPRINT_MODE_NORMAL {
- plan, err = MakePlanForBlueprint(blueprint, skipCollectors)
+ plan, err = MakePlanForBlueprint(blueprint, syncPolicy)
if err != nil {
blueprintLog.Error(err, fmt.Sprintf("failed to
MakePlanForBlueprint on blueprint:[%d][%s]", blueprint.ID, blueprint.Name))
return nil, err
@@ -280,6 +293,12 @@ func createPipelineByBlueprint(blueprint
*models.Blueprint, skipCollectors bool)
newPipeline.Labels = blueprint.Labels
newPipeline.SkipOnFail = blueprint.SkipOnFail
+ if syncPolicy != nil && syncPolicy.FullSync {
+ newPipeline.FullSync = true
+ } else {
+ newPipeline.FullSync = blueprint.FullSync
+ }
+
// if the plan is empty, we should not create the pipeline
var shouldCreatePipeline bool
for _, stage := range plan {
@@ -306,9 +325,11 @@ func createPipelineByBlueprint(blueprint
*models.Blueprint, skipCollectors bool)
}
// MakePlanForBlueprint generates pipeline plan by version
-func MakePlanForBlueprint(blueprint *models.Blueprint, skipCollectors bool)
(models.PipelinePlan, errors.Error) {
- bpSyncPolicy := models.BlueprintSyncPolicy{}
- bpSyncPolicy.TimeAfter = blueprint.TimeAfter
+func MakePlanForBlueprint(blueprint *models.Blueprint, syncPolicy
*models.SyncPolicy) (models.PipelinePlan, errors.Error) {
+ if syncPolicy != nil {
+ syncPolicy.TimeAfter = blueprint.TimeAfter
+ syncPolicy.FullSync = blueprint.FullSync
+ }
var plan models.PipelinePlan
// load project metric plugins and convert it to a map
@@ -323,7 +344,7 @@ func MakePlanForBlueprint(blueprint *models.Blueprint,
skipCollectors bool) (mod
metrics[projectMetric.PluginName] =
json.RawMessage(projectMetric.PluginOption)
}
}
- plan, err := GeneratePlanJsonV200(blueprint.ProjectName, bpSyncPolicy,
blueprint.Connections, metrics, skipCollectors)
+ plan, err := GeneratePlanJsonV200(blueprint.ProjectName, syncPolicy,
blueprint.Connections, metrics)
if err != nil {
return nil, err
}
@@ -360,12 +381,12 @@ func SequencializePipelinePlans(plans
...models.PipelinePlan) models.PipelinePla
}
// TriggerBlueprint triggers blueprint immediately
-func TriggerBlueprint(id uint64, skipCollectors bool) (*models.Pipeline,
errors.Error) {
+func TriggerBlueprint(id uint64, syncPolicy *models.SyncPolicy)
(*models.Pipeline, errors.Error) {
// load record from db
blueprint, err := GetBlueprint(id)
if err != nil {
logger.Error(err, "GetBlueprint, id: %d", id)
return nil, err
}
- return createPipelineByBlueprint(blueprint, skipCollectors)
+ return createPipelineByBlueprint(blueprint, syncPolicy)
}
diff --git a/backend/server/services/blueprint_makeplan_v200.go
b/backend/server/services/blueprint_makeplan_v200.go
index 175b6d3c1..b4f1bfff7 100644
--- a/backend/server/services/blueprint_makeplan_v200.go
+++ b/backend/server/services/blueprint_makeplan_v200.go
@@ -30,10 +30,9 @@ import (
// GeneratePlanJsonV200 generates pipeline plan according v2.0.0 definition
func GeneratePlanJsonV200(
projectName string,
- syncPolicy coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
connections []*coreModels.BlueprintConnection,
metrics map[string]json.RawMessage,
- skipCollectors bool,
) (coreModels.PipelinePlan, errors.Error) {
var err errors.Error
// make plan for data-source coreModels fist. generate plan for each
@@ -73,7 +72,7 @@ func GeneratePlanJsonV200(
}
// skip collectors
- if skipCollectors {
+ if syncPolicy != nil && syncPolicy.SkipCollectors {
for i, plan := range sourcePlans {
for j, stage := range plan {
for k, task := range stage {
diff --git a/backend/server/services/blueprint_makeplan_v200_test.go
b/backend/server/services/blueprint_makeplan_v200_test.go
index ce9c3cf8b..093b9ea69 100644
--- a/backend/server/services/blueprint_makeplan_v200_test.go
+++ b/backend/server/services/blueprint_makeplan_v200_test.go
@@ -36,7 +36,7 @@ func TestMakePlanV200(t *testing.T) {
githubName := "TestMakePlanV200-github" // mimic github
// mock github plugin as a data source plugin
githubConnId := uint64(1)
- syncPolicy := coreModels.BlueprintSyncPolicy{}
+ syncPolicy := &coreModels.SyncPolicy{}
githubScopes := []*coreModels.BlueprintScope{
{ScopeId: "github:GithubRepo:1:123"},
{ScopeId: "github:GithubRepo:1:321"},
@@ -97,7 +97,7 @@ func TestMakePlanV200(t *testing.T) {
doraName: nil,
}
- plan, err := GeneratePlanJsonV200(projectName, syncPolicy, connections,
metrics, false)
+ plan, err := GeneratePlanJsonV200(projectName, syncPolicy, connections,
metrics)
assert.Nil(t, err)
assert.Equal(t, expectedPlan, plan)
diff --git a/backend/server/services/pipeline_helper.go
b/backend/server/services/pipeline_helper.go
index 409a2a58a..a681608e0 100644
--- a/backend/server/services/pipeline_helper.go
+++ b/backend/server/services/pipeline_helper.go
@@ -62,6 +62,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline)
(pipeline *models.Pipelin
SpentSeconds: 0,
Plan: newPipeline.Plan,
SkipOnFail: newPipeline.SkipOnFail,
+ FullSync: newPipeline.FullSync,
}
if newPipeline.BlueprintId != 0 {
dbPipeline.BlueprintId = newPipeline.BlueprintId
diff --git a/backend/server/services/remote/plugin/plugin_extensions.go
b/backend/server/services/remote/plugin/plugin_extensions.go
index bd331f5db..09e070383 100644
--- a/backend/server/services/remote/plugin/plugin_extensions.go
+++ b/backend/server/services/remote/plugin/plugin_extensions.go
@@ -43,7 +43,7 @@ func (p remoteMetricPlugin)
MakeMetricPluginPipelinePlanV200(projectName string,
func (p remoteDatasourcePlugin) MakeDataSourcePipelinePlanV200(
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy coreModels.BlueprintSyncPolicy,
+ syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
connection := p.connectionTabler.New()
err := p.connHelper.FirstById(connection, connectionId)
diff --git a/backend/test/helper/client.go b/backend/test/helper/client.go
index 8ac739d0d..446ea6d1f 100644
--- a/backend/test/helper/client.go
+++ b/backend/test/helper/client.go
@@ -240,6 +240,7 @@ func (d *DevlakeClient) RunPlugin(ctx context.Context,
pluginName string, plugin
task,
pluginTask,
nil,
+ nil,
)
}