This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch feat#5841
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/feat#5841 by this push:
     new 74b84c1e4 feat: allow users to configure the sync modes for plugins 
that support incremental sync
74b84c1e4 is described below

commit 74b84c1e479b6c914755b54547d617758b0eb2aa
Author: abeizn <[email protected]>
AuthorDate: Wed Aug 30 14:16:59 2023 +0800

    feat: allow users to configure the sync modes for plugins that support 
incremental sync
---
 .../20230828_add_is_full_sync_to_pipelines.go      | 49 ++++++++++++++++++++++
 backend/core/models/migrationscripts/register.go   |  1 +
 backend/core/models/pipeline.go                    |  2 +
 backend/core/plugin/plugin_task.go                 |  2 +
 backend/core/runner/directrun.go                   |  1 +
 backend/core/runner/run_task.go                    |  9 ++++
 backend/helpers/pluginhelper/api/api_collector.go  | 10 +++--
 .../pluginhelper/api/api_collector_with_state.go   |  9 +++-
 .../helpers/pluginhelper/api/graphql_collector.go  |  6 ++-
 backend/impls/context/default_task_context.go      | 13 +++++-
 backend/server/api/blueprints/blueprints.go        |  6 ++-
 backend/server/services/blueprint.go               |  9 ++--
 backend/server/services/pipeline_helper.go         |  1 +
 backend/test/helper/client.go                      |  1 +
 14 files changed, 106 insertions(+), 13 deletions(-)

diff --git 
a/backend/core/models/migrationscripts/20230828_add_is_full_sync_to_pipelines.go
 
b/backend/core/models/migrationscripts/20230828_add_is_full_sync_to_pipelines.go
new file mode 100644
index 000000000..e4355ed9a
--- /dev/null
+++ 
b/backend/core/models/migrationscripts/20230828_add_is_full_sync_to_pipelines.go
@@ -0,0 +1,49 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+       "github.com/apache/incubator-devlake/core/context"
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/helpers/migrationhelper"
+)
+
+type addIsFullSync20200828 struct {
+       FullSync bool `json:"fullSync"`
+}
+
+func (*addIsFullSync20200828) TableName() string {
+       return "_devlake_pipelines"
+}
+
+type addFullSync struct{}
+
+func (*addFullSync) Up(basicRes context.BasicRes) errors.Error {
+       return migrationhelper.AutoMigrateTables(
+               basicRes,
+               &addIsFullSync20200828{},
+       )
+}
+
+func (*addFullSync) Version() uint64 {
+       return 20230828000041
+}
+
+func (*addFullSync) Name() string {
+       return "add full_sync to _devlake_pipelines table"
+}
diff --git a/backend/core/models/migrationscripts/register.go 
b/backend/core/models/migrationscripts/register.go
index eee037c60..fd0f469b9 100644
--- a/backend/core/models/migrationscripts/register.go
+++ b/backend/core/models/migrationscripts/register.go
@@ -91,5 +91,6 @@ func All() []plugin.MigrationScript {
                new(tasksUsesJSON),
                new(modifyCicdPipelinesToText),
                new(dropTapStateTable),
+               new(addFullSync),
        }
 }
diff --git a/backend/core/models/pipeline.go b/backend/core/models/pipeline.go
index bec361943..0d012dcce 100644
--- a/backend/core/models/pipeline.go
+++ b/backend/core/models/pipeline.go
@@ -40,6 +40,7 @@ type Pipeline struct {
        Stage         int                 `json:"stage"`
        Labels        []string            `json:"labels" gorm:"-"`
        SkipOnFail    bool                `json:"skipOnFail"`
+       FullSync      bool                `json:"fullSync"`
 }
 
 // We use a 2D array because the request body must be an array of a set of 
tasks
@@ -49,6 +50,7 @@ type NewPipeline struct {
        Plan        plugin.PipelinePlan `json:"plan" swaggertype:"array,string" 
example:"please check api /pipelines/<PLUGIN_NAME>/pipeline-plan"`
        Labels      []string            `json:"labels"`
        SkipOnFail  bool                `json:"skipOnFail"`
+       FullSync    bool                `json:"fullSync"`
        BlueprintId uint64
 }
 
diff --git a/backend/core/plugin/plugin_task.go 
b/backend/core/plugin/plugin_task.go
index d7dbc14bc..65318918b 100644
--- a/backend/core/plugin/plugin_task.go
+++ b/backend/core/plugin/plugin_task.go
@@ -62,6 +62,8 @@ type SubTaskContext interface {
 type TaskContext interface {
        ExecContext
        SetData(data interface{})
+       SetFullSync(fullSync bool)
+       FullSync() bool
        SubTaskContext(subtask string) (SubTaskContext, errors.Error)
 }
 
diff --git a/backend/core/runner/directrun.go b/backend/core/runner/directrun.go
index cb6935b9e..5699686ad 100644
--- a/backend/core/runner/directrun.go
+++ b/backend/core/runner/directrun.go
@@ -90,6 +90,7 @@ func DirectRun(cmd *cobra.Command, args []string, pluginTask 
plugin.PluginTask,
                task,
                pluginTask,
                nil,
+               false,
        )
        if err != nil {
                panic(err)
diff --git a/backend/core/runner/run_task.go b/backend/core/runner/run_task.go
index c0f80fa9f..7bd733a08 100644
--- a/backend/core/runner/run_task.go
+++ b/backend/core/runner/run_task.go
@@ -131,11 +131,16 @@ func RunTask(
                return dbe
        }
 
+       fullSync := false
+       if dbPipeline.FullSync {
+               fullSync = true
+       }
        err = RunPluginTask(
                ctx,
                basicRes.ReplaceLogger(logger),
                task,
                progress,
+               fullSync,
        )
        return err
 }
@@ -146,6 +151,7 @@ func RunPluginTask(
        basicRes context.BasicRes,
        task *models.Task,
        progress chan plugin.RunningProgress,
+       fullSync bool,
 ) errors.Error {
        pluginMeta, err := plugin.GetPlugin(task.Plugin)
        if err != nil {
@@ -161,6 +167,7 @@ func RunPluginTask(
                task,
                pluginTask,
                progress,
+               fullSync,
        )
 }
 
@@ -171,6 +178,7 @@ func RunPluginSubTasks(
        task *models.Task,
        pluginTask plugin.PluginTask,
        progress chan plugin.RunningProgress,
+       fullSync bool,
 ) errors.Error {
        logger := basicRes.GetLogger()
        logger.Info("start plugin")
@@ -236,6 +244,7 @@ func RunPluginSubTasks(
        if err != nil {
                return errors.Default.Wrap(err, fmt.Sprintf("error preparing 
task data for %s", task.Plugin))
        }
+       taskCtx.SetFullSync(fullSync)
        taskCtx.SetData(taskData)
 
        // execute subtasks in order
diff --git a/backend/helpers/pluginhelper/api/api_collector.go 
b/backend/helpers/pluginhelper/api/api_collector.go
index 0cd4b1c4d..53956c24f 100644
--- a/backend/helpers/pluginhelper/api/api_collector.go
+++ b/backend/helpers/pluginhelper/api/api_collector.go
@@ -21,7 +21,6 @@ import (
        "bytes"
        "encoding/json"
        "fmt"
-       "github.com/apache/incubator-devlake/core/plugin"
        "io"
        "net/http"
        "net/url"
@@ -29,6 +28,8 @@ import (
        "text/template"
        "time"
 
+       "github.com/apache/incubator-devlake/core/plugin"
+
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/common"
@@ -159,9 +160,12 @@ func (collector *ApiCollector) Execute() errors.Error {
        if err != nil {
                return errors.Default.Wrap(err, "error auto-migrating 
collector")
        }
-
+       isIncremental := collector.args.Incremental
+       if collector.args.Ctx.TaskContext().FullSync() {
+               isIncremental = false
+       }
        // flush data if not incremental collection
-       if !collector.args.Incremental {
+       if !isIncremental {
                err = db.Delete(&RawData{}, dal.From(collector.table), 
dal.Where("params = ?", collector.params))
                if err != nil {
                        return errors.Default.Wrap(err, "error deleting data 
from collector")
diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go 
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index 11dca16e6..3d6c5bd35 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -71,6 +71,9 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs, 
timeAfter *time.Time) (*Ap
 
 // IsIncremental indicates if the collector should operate in incremental mode
 func (m *ApiCollectorStateManager) IsIncremental() bool {
+       if m.Ctx.TaskContext().FullSync() {
+               return false
+       }
        prevSyncTime := m.LatestState.LatestSuccessStart
        prevTimeAfter := m.LatestState.TimeAfter
        currTimeAfter := m.TimeAfter
@@ -153,10 +156,12 @@ func NewStatefulApiCollectorForFinalizableEntity(args 
FinalizableApiCollectorArg
        if err != nil {
                return nil, err
        }
-
        // // prepare the basic variables
-       var isIncremental = manager.IsIncremental()
        var createdAfter *time.Time
+       var isIncremental = manager.IsIncremental()
+       if manager.Ctx.TaskContext().FullSync() {
+               isIncremental = false
+       }
        if isIncremental {
                createdAfter = manager.LatestState.LatestSuccessStart
        } else {
diff --git a/backend/helpers/pluginhelper/api/graphql_collector.go 
b/backend/helpers/pluginhelper/api/graphql_collector.go
index 0347d5154..588fd7cec 100644
--- a/backend/helpers/pluginhelper/api/graphql_collector.go
+++ b/backend/helpers/pluginhelper/api/graphql_collector.go
@@ -138,8 +138,12 @@ func (collector *GraphqlCollector) Execute() errors.Error {
 
        divider := NewBatchSaveDivider(collector.args.Ctx, 
collector.args.BatchSize, collector.table, collector.params)
 
+       isIncremental := collector.args.Incremental
+       if collector.args.Ctx.TaskContext().FullSync() {
+               isIncremental = false
+       }
        // flush data if not incremental collection
-       if collector.args.Incremental {
+       if isIncremental {
                // re extract data for new scope config
                err = collector.ExtractExistRawData(divider)
                if err != nil {
diff --git a/backend/impls/context/default_task_context.go 
b/backend/impls/context/default_task_context.go
index 1a6b79529..b0fa7db81 100644
--- a/backend/impls/context/default_task_context.go
+++ b/backend/impls/context/default_task_context.go
@@ -20,10 +20,11 @@ package context
 import (
        gocontext "context"
        "fmt"
+       "time"
+
        "github.com/apache/incubator-devlake/core/context"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
-       "time"
 )
 
 // DefaultTaskContext is TaskContext default implementation
@@ -31,6 +32,7 @@ type DefaultTaskContext struct {
        *defaultExecContext
        subtasks    map[string]bool
        subtaskCtxs map[string]*DefaultSubTaskContext
+       fullSync    bool
 }
 
 // SetProgress FIXME ...
@@ -45,6 +47,14 @@ func (c *DefaultTaskContext) IncProgress(quantity int) {
        c.BasicRes.GetLogger().Info("finished step: %d / %d", c.current, 
c.total)
 }
 
+func (c *DefaultTaskContext) SetFullSync(fullSync bool) {
+       c.fullSync = fullSync
+}
+
+func (c *DefaultTaskContext) FullSync() bool {
+       return c.fullSync
+}
+
 // SubTaskContext FIXME ...
 func (c *DefaultTaskContext) SubTaskContext(subtask string) 
(plugin.SubTaskContext, errors.Error) {
        // no need to lock at this point because subtasks is written only once
@@ -86,6 +96,7 @@ func NewDefaultTaskContext(
                newDefaultExecContext(ctx, basicRes, name, nil, progress),
                subtasks,
                make(map[string]*DefaultSubTaskContext),
+               false,
        }
 }
 
diff --git a/backend/server/api/blueprints/blueprints.go 
b/backend/server/api/blueprints/blueprints.go
index ef2afc89c..7d020098e 100644
--- a/backend/server/api/blueprints/blueprints.go
+++ b/backend/server/api/blueprints/blueprints.go
@@ -169,6 +169,7 @@ func Patch(c *gin.Context) {
 // @Accept application/json
 // @Param blueprintId path string true "blueprintId"
 // @Param skipCollectors body bool false "skipCollectors"
+// @Param fullSync body bool false "fullSync"
 // @Success 200 {object} models.Pipeline
 // @Failure 400 {object} shared.ApiBody "Bad Request"
 // @Failure 500 {object} shared.ApiBody "Internal Error"
@@ -183,10 +184,11 @@ func Trigger(c *gin.Context) {
 
        var body struct {
                SkipCollectors bool `json:"skipCollectors"`
+               FullSync       bool `json:"fullSync"`
        }
-
        if c.Request.Body == nil || c.Request.ContentLength == 0 {
                body.SkipCollectors = false
+               body.FullSync = false
        } else {
                err = c.ShouldBindJSON(&body)
                if err != nil {
@@ -195,7 +197,7 @@ func Trigger(c *gin.Context) {
                }
        }
 
-       pipeline, err := services.TriggerBlueprint(id, body.SkipCollectors)
+       pipeline, err := services.TriggerBlueprint(id, body.SkipCollectors, 
body.FullSync)
        if err != nil {
                shared.ApiOutputError(c, errors.Default.Wrap(err, "error 
triggering blueprint"))
                return
diff --git a/backend/server/services/blueprint.go 
b/backend/server/services/blueprint.go
index 29aa62631..2209cf5b2 100644
--- a/backend/server/services/blueprint.go
+++ b/backend/server/services/blueprint.go
@@ -54,7 +54,7 @@ type BlueprintJob struct {
 
 func (bj BlueprintJob) Run() {
        blueprint := bj.Blueprint
-       pipeline, err := createPipelineByBlueprint(blueprint, false)
+       pipeline, err := createPipelineByBlueprint(blueprint, false, false)
        if err == ErrEmptyPlan {
                blueprintLog.Info("Empty plan, blueprint id:[%d] blueprint 
name:[%s]", blueprint.ID, blueprint.Name)
                return
@@ -265,7 +265,7 @@ func ReloadBlueprints(c *cron.Cron) errors.Error {
        return nil
 }
 
-func createPipelineByBlueprint(blueprint *models.Blueprint, skipCollectors 
bool) (*models.Pipeline, errors.Error) {
+func createPipelineByBlueprint(blueprint *models.Blueprint, skipCollectors 
bool, fullSync bool) (*models.Pipeline, errors.Error) {
        var plan plugin.PipelinePlan
        var err errors.Error
        if blueprint.Mode == models.BLUEPRINT_MODE_NORMAL {
@@ -283,6 +283,7 @@ func createPipelineByBlueprint(blueprint *models.Blueprint, 
skipCollectors bool)
        newPipeline.BlueprintId = blueprint.ID
        newPipeline.Labels = blueprint.Labels
        newPipeline.SkipOnFail = blueprint.SkipOnFail
+       newPipeline.FullSync = fullSync
 
        // if the plan is empty, we should not create the pipeline
        var shouldCreatePipeline bool
@@ -398,12 +399,12 @@ func SequencializePipelinePlans(plans 
...plugin.PipelinePlan) plugin.PipelinePla
 }
 
 // TriggerBlueprint triggers blueprint immediately
-func TriggerBlueprint(id uint64, skipCollectors bool) (*models.Pipeline, 
errors.Error) {
+func TriggerBlueprint(id uint64, skipCollectors bool, fullSync bool) 
(*models.Pipeline, errors.Error) {
        // load record from db
        blueprint, err := GetBlueprint(id)
        if err != nil {
                logger.Error(err, "GetBlueprint, id: %d", id)
                return nil, err
        }
-       return createPipelineByBlueprint(blueprint, skipCollectors)
+       return createPipelineByBlueprint(blueprint, skipCollectors, fullSync)
 }
diff --git a/backend/server/services/pipeline_helper.go 
b/backend/server/services/pipeline_helper.go
index 409a2a58a..a681608e0 100644
--- a/backend/server/services/pipeline_helper.go
+++ b/backend/server/services/pipeline_helper.go
@@ -62,6 +62,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) 
(pipeline *models.Pipelin
                SpentSeconds:  0,
                Plan:          newPipeline.Plan,
                SkipOnFail:    newPipeline.SkipOnFail,
+               FullSync:      newPipeline.FullSync,
        }
        if newPipeline.BlueprintId != 0 {
                dbPipeline.BlueprintId = newPipeline.BlueprintId
diff --git a/backend/test/helper/client.go b/backend/test/helper/client.go
index 8ac739d0d..82fb8db5e 100644
--- a/backend/test/helper/client.go
+++ b/backend/test/helper/client.go
@@ -240,6 +240,7 @@ func (d *DevlakeClient) RunPlugin(ctx context.Context, 
pluginName string, plugin
                task,
                pluginTask,
                nil,
+               false,
        )
 }
 

Reply via email to