This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch feat#5841
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/feat#5841 by this push:
new 74b84c1e4 feat: allow users to configure the sync modes for plugins
that support incremental sync
74b84c1e4 is described below
commit 74b84c1e479b6c914755b54547d617758b0eb2aa
Author: abeizn <[email protected]>
AuthorDate: Wed Aug 30 14:16:59 2023 +0800
feat: allow users to configure the sync modes for plugins that support
incremental sync
---
.../20230828_add_is_full_sync_to_pipelines.go | 49 ++++++++++++++++++++++
backend/core/models/migrationscripts/register.go | 1 +
backend/core/models/pipeline.go | 2 +
backend/core/plugin/plugin_task.go | 2 +
backend/core/runner/directrun.go | 1 +
backend/core/runner/run_task.go | 9 ++++
backend/helpers/pluginhelper/api/api_collector.go | 10 +++--
.../pluginhelper/api/api_collector_with_state.go | 9 +++-
.../helpers/pluginhelper/api/graphql_collector.go | 6 ++-
backend/impls/context/default_task_context.go | 13 +++++-
backend/server/api/blueprints/blueprints.go | 6 ++-
backend/server/services/blueprint.go | 9 ++--
backend/server/services/pipeline_helper.go | 1 +
backend/test/helper/client.go | 1 +
14 files changed, 106 insertions(+), 13 deletions(-)
diff --git
a/backend/core/models/migrationscripts/20230828_add_is_full_sync_to_pipelines.go
b/backend/core/models/migrationscripts/20230828_add_is_full_sync_to_pipelines.go
new file mode 100644
index 000000000..e4355ed9a
--- /dev/null
+++
b/backend/core/models/migrationscripts/20230828_add_is_full_sync_to_pipelines.go
@@ -0,0 +1,49 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+ "github.com/apache/incubator-devlake/core/context"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/helpers/migrationhelper"
+)
+
+type addIsFullSync20200828 struct {
+ FullSync bool `json:"fullSync"`
+}
+
+func (*addIsFullSync20200828) TableName() string {
+ return "_devlake_pipelines"
+}
+
+type addFullSync struct{}
+
+func (*addFullSync) Up(basicRes context.BasicRes) errors.Error {
+ return migrationhelper.AutoMigrateTables(
+ basicRes,
+ &addIsFullSync20200828{},
+ )
+}
+
+func (*addFullSync) Version() uint64 {
+ return 20230828000041
+}
+
+func (*addFullSync) Name() string {
+ return "add full_sync to _devlake_pipelines table"
+}
diff --git a/backend/core/models/migrationscripts/register.go
b/backend/core/models/migrationscripts/register.go
index eee037c60..fd0f469b9 100644
--- a/backend/core/models/migrationscripts/register.go
+++ b/backend/core/models/migrationscripts/register.go
@@ -91,5 +91,6 @@ func All() []plugin.MigrationScript {
new(tasksUsesJSON),
new(modifyCicdPipelinesToText),
new(dropTapStateTable),
+ new(addFullSync),
}
}
diff --git a/backend/core/models/pipeline.go b/backend/core/models/pipeline.go
index bec361943..0d012dcce 100644
--- a/backend/core/models/pipeline.go
+++ b/backend/core/models/pipeline.go
@@ -40,6 +40,7 @@ type Pipeline struct {
Stage int `json:"stage"`
Labels []string `json:"labels" gorm:"-"`
SkipOnFail bool `json:"skipOnFail"`
+ FullSync bool `json:"fullSync"`
}
// We use a 2D array because the request body must be an array of a set of
tasks
@@ -49,6 +50,7 @@ type NewPipeline struct {
Plan plugin.PipelinePlan `json:"plan" swaggertype:"array,string"
example:"please check api /pipelines/<PLUGIN_NAME>/pipeline-plan"`
Labels []string `json:"labels"`
SkipOnFail bool `json:"skipOnFail"`
+ FullSync bool `json:"fullSync"`
BlueprintId uint64
}
diff --git a/backend/core/plugin/plugin_task.go
b/backend/core/plugin/plugin_task.go
index d7dbc14bc..65318918b 100644
--- a/backend/core/plugin/plugin_task.go
+++ b/backend/core/plugin/plugin_task.go
@@ -62,6 +62,8 @@ type SubTaskContext interface {
type TaskContext interface {
ExecContext
SetData(data interface{})
+ SetFullSync(fullSync bool)
+ FullSync() bool
SubTaskContext(subtask string) (SubTaskContext, errors.Error)
}
diff --git a/backend/core/runner/directrun.go b/backend/core/runner/directrun.go
index cb6935b9e..5699686ad 100644
--- a/backend/core/runner/directrun.go
+++ b/backend/core/runner/directrun.go
@@ -90,6 +90,7 @@ func DirectRun(cmd *cobra.Command, args []string, pluginTask
plugin.PluginTask,
task,
pluginTask,
nil,
+ false,
)
if err != nil {
panic(err)
diff --git a/backend/core/runner/run_task.go b/backend/core/runner/run_task.go
index c0f80fa9f..7bd733a08 100644
--- a/backend/core/runner/run_task.go
+++ b/backend/core/runner/run_task.go
@@ -131,11 +131,16 @@ func RunTask(
return dbe
}
+ fullSync := false
+ if dbPipeline.FullSync {
+ fullSync = true
+ }
err = RunPluginTask(
ctx,
basicRes.ReplaceLogger(logger),
task,
progress,
+ fullSync,
)
return err
}
@@ -146,6 +151,7 @@ func RunPluginTask(
basicRes context.BasicRes,
task *models.Task,
progress chan plugin.RunningProgress,
+ fullSync bool,
) errors.Error {
pluginMeta, err := plugin.GetPlugin(task.Plugin)
if err != nil {
@@ -161,6 +167,7 @@ func RunPluginTask(
task,
pluginTask,
progress,
+ fullSync,
)
}
@@ -171,6 +178,7 @@ func RunPluginSubTasks(
task *models.Task,
pluginTask plugin.PluginTask,
progress chan plugin.RunningProgress,
+ fullSync bool,
) errors.Error {
logger := basicRes.GetLogger()
logger.Info("start plugin")
@@ -236,6 +244,7 @@ func RunPluginSubTasks(
if err != nil {
return errors.Default.Wrap(err, fmt.Sprintf("error preparing
task data for %s", task.Plugin))
}
+ taskCtx.SetFullSync(fullSync)
taskCtx.SetData(taskData)
// execute subtasks in order
diff --git a/backend/helpers/pluginhelper/api/api_collector.go
b/backend/helpers/pluginhelper/api/api_collector.go
index 0cd4b1c4d..53956c24f 100644
--- a/backend/helpers/pluginhelper/api/api_collector.go
+++ b/backend/helpers/pluginhelper/api/api_collector.go
@@ -21,7 +21,6 @@ import (
"bytes"
"encoding/json"
"fmt"
- "github.com/apache/incubator-devlake/core/plugin"
"io"
"net/http"
"net/url"
@@ -29,6 +28,8 @@ import (
"text/template"
"time"
+ "github.com/apache/incubator-devlake/core/plugin"
+
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/helpers/pluginhelper/common"
@@ -159,9 +160,12 @@ func (collector *ApiCollector) Execute() errors.Error {
if err != nil {
return errors.Default.Wrap(err, "error auto-migrating
collector")
}
-
+ isIncremental := collector.args.Incremental
+ if collector.args.Ctx.TaskContext().FullSync() {
+ isIncremental = false
+ }
// flush data if not incremental collection
- if !collector.args.Incremental {
+ if !isIncremental {
err = db.Delete(&RawData{}, dal.From(collector.table),
dal.Where("params = ?", collector.params))
if err != nil {
return errors.Default.Wrap(err, "error deleting data
from collector")
diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index 11dca16e6..3d6c5bd35 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -71,6 +71,9 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs,
timeAfter *time.Time) (*Ap
// IsIncremental indicates if the collector should operate in incremental mode
func (m *ApiCollectorStateManager) IsIncremental() bool {
+ if m.Ctx.TaskContext().FullSync() {
+ return false
+ }
prevSyncTime := m.LatestState.LatestSuccessStart
prevTimeAfter := m.LatestState.TimeAfter
currTimeAfter := m.TimeAfter
@@ -153,10 +156,12 @@ func NewStatefulApiCollectorForFinalizableEntity(args
FinalizableApiCollectorArg
if err != nil {
return nil, err
}
-
// // prepare the basic variables
- var isIncremental = manager.IsIncremental()
var createdAfter *time.Time
+ var isIncremental = manager.IsIncremental()
+ if manager.Ctx.TaskContext().FullSync() {
+ isIncremental = false
+ }
if isIncremental {
createdAfter = manager.LatestState.LatestSuccessStart
} else {
diff --git a/backend/helpers/pluginhelper/api/graphql_collector.go
b/backend/helpers/pluginhelper/api/graphql_collector.go
index 0347d5154..588fd7cec 100644
--- a/backend/helpers/pluginhelper/api/graphql_collector.go
+++ b/backend/helpers/pluginhelper/api/graphql_collector.go
@@ -138,8 +138,12 @@ func (collector *GraphqlCollector) Execute() errors.Error {
divider := NewBatchSaveDivider(collector.args.Ctx,
collector.args.BatchSize, collector.table, collector.params)
+ isIncremental := collector.args.Incremental
+ if collector.args.Ctx.TaskContext().FullSync() {
+ isIncremental = false
+ }
// flush data if not incremental collection
- if collector.args.Incremental {
+ if isIncremental {
// re extract data for new scope config
err = collector.ExtractExistRawData(divider)
if err != nil {
diff --git a/backend/impls/context/default_task_context.go
b/backend/impls/context/default_task_context.go
index 1a6b79529..b0fa7db81 100644
--- a/backend/impls/context/default_task_context.go
+++ b/backend/impls/context/default_task_context.go
@@ -20,10 +20,11 @@ package context
import (
gocontext "context"
"fmt"
+ "time"
+
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
- "time"
)
// DefaultTaskContext is TaskContext default implementation
@@ -31,6 +32,7 @@ type DefaultTaskContext struct {
*defaultExecContext
subtasks map[string]bool
subtaskCtxs map[string]*DefaultSubTaskContext
+ fullSync bool
}
// SetProgress FIXME ...
@@ -45,6 +47,14 @@ func (c *DefaultTaskContext) IncProgress(quantity int) {
c.BasicRes.GetLogger().Info("finished step: %d / %d", c.current,
c.total)
}
+func (c *DefaultTaskContext) SetFullSync(fullSync bool) {
+ c.fullSync = fullSync
+}
+
+func (c *DefaultTaskContext) FullSync() bool {
+ return c.fullSync
+}
+
// SubTaskContext FIXME ...
func (c *DefaultTaskContext) SubTaskContext(subtask string)
(plugin.SubTaskContext, errors.Error) {
// no need to lock at this point because subtasks is written only once
@@ -86,6 +96,7 @@ func NewDefaultTaskContext(
newDefaultExecContext(ctx, basicRes, name, nil, progress),
subtasks,
make(map[string]*DefaultSubTaskContext),
+ false,
}
}
diff --git a/backend/server/api/blueprints/blueprints.go
b/backend/server/api/blueprints/blueprints.go
index ef2afc89c..7d020098e 100644
--- a/backend/server/api/blueprints/blueprints.go
+++ b/backend/server/api/blueprints/blueprints.go
@@ -169,6 +169,7 @@ func Patch(c *gin.Context) {
// @Accept application/json
// @Param blueprintId path string true "blueprintId"
// @Param skipCollectors body bool false "skipCollectors"
+// @Param fullSync body bool false "fullSync"
// @Success 200 {object} models.Pipeline
// @Failure 400 {object} shared.ApiBody "Bad Request"
// @Failure 500 {object} shared.ApiBody "Internal Error"
@@ -183,10 +184,11 @@ func Trigger(c *gin.Context) {
var body struct {
SkipCollectors bool `json:"skipCollectors"`
+ FullSync bool `json:"fullSync"`
}
-
if c.Request.Body == nil || c.Request.ContentLength == 0 {
body.SkipCollectors = false
+ body.FullSync = false
} else {
err = c.ShouldBindJSON(&body)
if err != nil {
@@ -195,7 +197,7 @@ func Trigger(c *gin.Context) {
}
}
- pipeline, err := services.TriggerBlueprint(id, body.SkipCollectors)
+ pipeline, err := services.TriggerBlueprint(id, body.SkipCollectors,
body.FullSync)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error
triggering blueprint"))
return
diff --git a/backend/server/services/blueprint.go
b/backend/server/services/blueprint.go
index 29aa62631..2209cf5b2 100644
--- a/backend/server/services/blueprint.go
+++ b/backend/server/services/blueprint.go
@@ -54,7 +54,7 @@ type BlueprintJob struct {
func (bj BlueprintJob) Run() {
blueprint := bj.Blueprint
- pipeline, err := createPipelineByBlueprint(blueprint, false)
+ pipeline, err := createPipelineByBlueprint(blueprint, false, false)
if err == ErrEmptyPlan {
blueprintLog.Info("Empty plan, blueprint id:[%d] blueprint
name:[%s]", blueprint.ID, blueprint.Name)
return
@@ -265,7 +265,7 @@ func ReloadBlueprints(c *cron.Cron) errors.Error {
return nil
}
-func createPipelineByBlueprint(blueprint *models.Blueprint, skipCollectors
bool) (*models.Pipeline, errors.Error) {
+func createPipelineByBlueprint(blueprint *models.Blueprint, skipCollectors
bool, fullSync bool) (*models.Pipeline, errors.Error) {
var plan plugin.PipelinePlan
var err errors.Error
if blueprint.Mode == models.BLUEPRINT_MODE_NORMAL {
@@ -283,6 +283,7 @@ func createPipelineByBlueprint(blueprint *models.Blueprint,
skipCollectors bool)
newPipeline.BlueprintId = blueprint.ID
newPipeline.Labels = blueprint.Labels
newPipeline.SkipOnFail = blueprint.SkipOnFail
+ newPipeline.FullSync = fullSync
// if the plan is empty, we should not create the pipeline
var shouldCreatePipeline bool
@@ -398,12 +399,12 @@ func SequencializePipelinePlans(plans
...plugin.PipelinePlan) plugin.PipelinePla
}
// TriggerBlueprint triggers blueprint immediately
-func TriggerBlueprint(id uint64, skipCollectors bool) (*models.Pipeline,
errors.Error) {
+func TriggerBlueprint(id uint64, skipCollectors bool, fullSync bool)
(*models.Pipeline, errors.Error) {
// load record from db
blueprint, err := GetBlueprint(id)
if err != nil {
logger.Error(err, "GetBlueprint, id: %d", id)
return nil, err
}
- return createPipelineByBlueprint(blueprint, skipCollectors)
+ return createPipelineByBlueprint(blueprint, skipCollectors, fullSync)
}
diff --git a/backend/server/services/pipeline_helper.go
b/backend/server/services/pipeline_helper.go
index 409a2a58a..a681608e0 100644
--- a/backend/server/services/pipeline_helper.go
+++ b/backend/server/services/pipeline_helper.go
@@ -62,6 +62,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline)
(pipeline *models.Pipelin
SpentSeconds: 0,
Plan: newPipeline.Plan,
SkipOnFail: newPipeline.SkipOnFail,
+ FullSync: newPipeline.FullSync,
}
if newPipeline.BlueprintId != 0 {
dbPipeline.BlueprintId = newPipeline.BlueprintId
diff --git a/backend/test/helper/client.go b/backend/test/helper/client.go
index 8ac739d0d..82fb8db5e 100644
--- a/backend/test/helper/client.go
+++ b/backend/test/helper/client.go
@@ -240,6 +240,7 @@ func (d *DevlakeClient) RunPlugin(ctx context.Context,
pluginName string, plugin
task,
pluginTask,
nil,
+ false,
)
}