This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch feat#5841-4
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/feat#5841-4 by this push:
new 06f3cf2b4 refactor: syncPolicy timeAfter
06f3cf2b4 is described below
commit 06f3cf2b4f7df6e5e2fa180a0bbcc28183541f8c
Author: abeizn <[email protected]>
AuthorDate: Tue Sep 12 17:45:39 2023 +0800
refactor: syncPolicy timeAfter
---
backend/core/models/blueprint.go | 4 +--
...dd_full_sync.go => 20230911_add_sync_policy.go} | 33 ++++++++++++---------
backend/core/models/migrationscripts/register.go | 2 +-
backend/core/models/pipeline.go | 6 ++--
backend/core/plugin/plugin_blueprint.go | 1 -
backend/core/runner/run_task.go | 12 ++++++--
.../pluginhelper/api/api_collector_with_state.go | 21 +++++++------
backend/plugins/bamboo/api/blueprint_V200_test.go | 3 +-
backend/plugins/bamboo/api/blueprint_v200.go | 5 ++--
backend/plugins/bamboo/impl/impl.go | 3 +-
.../plugins/bitbucket/api/blueprint_V200_test.go | 3 +-
backend/plugins/bitbucket/api/blueprint_v200.go | 8 +----
backend/plugins/bitbucket/bitbucket.go | 2 --
backend/plugins/bitbucket/impl/impl.go | 17 ++---------
backend/plugins/bitbucket/tasks/api_common.go | 14 +++++----
backend/plugins/bitbucket/tasks/issue_collector.go | 2 +-
.../bitbucket/tasks/issue_comment_collector.go | 2 +-
.../plugins/bitbucket/tasks/pipeline_collector.go | 2 +-
.../bitbucket/tasks/pipeline_steps_collector.go | 2 +-
backend/plugins/bitbucket/tasks/pr_collector.go | 2 +-
.../bitbucket/tasks/pr_comment_collector.go | 2 +-
.../plugins/bitbucket/tasks/pr_commit_collector.go | 5 ++--
backend/plugins/bitbucket/tasks/task_data.go | 4 ---
backend/plugins/circleci/api/blueprint200.go | 8 +----
backend/plugins/circleci/impl/impl.go | 14 +--------
backend/plugins/circleci/tasks/task_data.go | 4 ---
backend/plugins/github/api/blueprint_V200_test.go | 3 +-
backend/plugins/github/api/blueprint_v200.go | 8 +----
backend/plugins/github/github.go | 2 --
backend/plugins/github/impl/impl.go | 13 +--------
backend/plugins/github/tasks/cicd_job_collector.go | 2 +-
backend/plugins/github/tasks/cicd_run_collector.go | 1 -
backend/plugins/github/tasks/comment_collector.go | 7 +++--
backend/plugins/github/tasks/commit_collector.go | 7 +++--
backend/plugins/github/tasks/event_collector.go | 1 -
backend/plugins/github/tasks/issue_collector.go | 7 +++--
backend/plugins/github/tasks/pr_collector.go | 1 -
.../plugins/github/tasks/pr_commit_collector.go | 2 +-
.../plugins/github/tasks/pr_review_collector.go | 2 +-
.../github/tasks/pr_review_comment_collector.go | 7 +++--
backend/plugins/github/tasks/task_data.go | 3 --
backend/plugins/github_graphql/impl/impl.go | 9 ------
backend/plugins/github_graphql/plugin_main.go | 2 --
.../github_graphql/tasks/issue_collector.go | 8 ++---
.../plugins/github_graphql/tasks/job_collector.go | 2 +-
.../plugins/github_graphql/tasks/pr_collector.go | 5 ++--
backend/plugins/gitlab/api/blueprint_V200_test.go | 3 +-
backend/plugins/gitlab/api/blueprint_v200.go | 9 ++----
backend/plugins/gitlab/gitlab.go | 2 --
backend/plugins/gitlab/impl/impl.go | 16 +---------
backend/plugins/gitlab/tasks/issue_collector.go | 7 +++--
backend/plugins/gitlab/tasks/job_collector.go | 1 -
backend/plugins/gitlab/tasks/mr_collector.go | 7 +++--
.../plugins/gitlab/tasks/mr_commit_collector.go | 2 +-
.../plugins/gitlab/tasks/mr_detail_collector.go | 7 +++--
backend/plugins/gitlab/tasks/mr_note_collector.go | 2 +-
backend/plugins/gitlab/tasks/pipeline_collector.go | 7 +++--
.../gitlab/tasks/pipeline_detail_collector.go | 2 +-
backend/plugins/gitlab/tasks/task_data.go | 4 ---
backend/plugins/jenkins/api/blueprint_v200.go | 10 +------
backend/plugins/jenkins/api/blueprint_v200_test.go | 4 +--
backend/plugins/jenkins/impl/impl.go | 16 ++--------
backend/plugins/jenkins/jenkins.go | 2 --
backend/plugins/jenkins/tasks/build_collector.go | 1 -
backend/plugins/jenkins/tasks/stage_collector.go | 6 ++--
backend/plugins/jenkins/tasks/task_data.go | 11 +++----
backend/plugins/jira/api/blueprint_v200.go | 9 +-----
backend/plugins/jira/api/blueprint_v200_test.go | 4 +--
backend/plugins/jira/impl/impl.go | 14 ++-------
backend/plugins/jira/jira.go | 3 +-
.../jira/tasks/development_panel_collector.go | 2 +-
.../jira/tasks/issue_changelog_collector.go | 2 +-
backend/plugins/jira/tasks/issue_collector.go | 16 +++++++---
.../plugins/jira/tasks/issue_comment_collector.go | 2 +-
backend/plugins/jira/tasks/remotelink_collector.go | 2 +-
backend/plugins/jira/tasks/task_data.go | 7 ++---
backend/plugins/jira/tasks/worklog_collector.go | 2 +-
backend/plugins/pagerduty/api/blueprint_v200.go | 10 ++-----
backend/plugins/pagerduty/impl/impl.go | 18 +++---------
.../plugins/pagerduty/tasks/incidents_collector.go | 1 -
backend/plugins/pagerduty/tasks/task_data.go | 8 ++---
backend/plugins/sonarqube/api/blueprint_v200.go | 4 +--
.../plugins/sonarqube/api/blueprint_v200_test.go | 4 +--
backend/plugins/sonarqube/impl/impl.go | 3 +-
backend/plugins/tapd/api/blueprint_v200.go | 8 +----
backend/plugins/tapd/api/blueprint_v200_test.go | 3 +-
backend/plugins/tapd/impl/impl.go | 12 +-------
.../plugins/tapd/tasks/bug_changelog_collector.go | 7 +++--
backend/plugins/tapd/tasks/bug_collector.go | 11 +++----
backend/plugins/tapd/tasks/bug_commit_collector.go | 14 +++++----
backend/plugins/tapd/tasks/iteration_collector.go | 7 +++--
backend/plugins/tapd/tasks/story_bug_collector.go | 12 ++++----
.../tapd/tasks/story_changelog_collector.go | 7 +++--
backend/plugins/tapd/tasks/story_collector.go | 7 +++--
.../plugins/tapd/tasks/story_commit_collector.go | 14 +++++----
.../plugins/tapd/tasks/task_changelog_collector.go | 8 ++---
backend/plugins/tapd/tasks/task_collector.go | 8 ++---
.../plugins/tapd/tasks/task_commit_collector.go | 14 +++++----
backend/plugins/tapd/tasks/task_data.go | 2 --
backend/plugins/tapd/tasks/worklog_collector.go | 8 ++---
backend/plugins/teambition/api/blueprint200.go | 8 +----
backend/plugins/teambition/impl/impl.go | 15 ++--------
backend/plugins/teambition/tasks/task_data.go | 5 ++--
backend/plugins/trello/api/blueprint_v200.go | 10 ++-----
backend/plugins/trello/impl/impl.go | 3 +-
backend/plugins/zentao/api/blueprint_V200_test.go | 3 +-
backend/plugins/zentao/api/blueprint_v200.go | 9 +-----
backend/plugins/zentao/impl/impl.go | 3 +-
.../plugins/zentao/tasks/bug_commits_collector.go | 2 +-
.../zentao/tasks/story_commits_collector.go | 2 +-
.../plugins/zentao/tasks/task_commits_collector.go | 2 +-
backend/plugins/zentao/tasks/task_data.go | 3 --
backend/server/api/blueprints/blueprints.go | 1 -
backend/server/services/blueprint.go | 34 +++++++++-------------
backend/server/services/blueprint_makeplan_v200.go | 5 ++--
.../services/blueprint_makeplan_v200_test.go | 2 +-
backend/server/services/pipeline_helper.go | 6 ++--
.../services/remote/plugin/plugin_extensions.go | 1 -
backend/test/helper/api.go | 6 ++--
119 files changed, 280 insertions(+), 485 deletions(-)
diff --git a/backend/core/models/blueprint.go b/backend/core/models/blueprint.go
index 371b97210..b9ff2a457 100644
--- a/backend/core/models/blueprint.go
+++ b/backend/core/models/blueprint.go
@@ -37,13 +37,11 @@ type Blueprint struct {
Enable bool `json:"enable"`
CronConfig string `json:"cronConfig" format:"* * * *
*" example:"0 0 * * 1"`
IsManual bool `json:"isManual"`
- SkipOnFail bool `json:"skipOnFail"`
BeforePlan PipelinePlan `json:"beforePlan"
gorm:"serializer:encdec"`
AfterPlan PipelinePlan `json:"afterPlan"
gorm:"serializer:encdec"`
- TimeAfter *time.Time `json:"timeAfter"`
- FullSync bool `json:"fullSync"`
Labels []string `json:"labels" gorm:"-"`
Connections []*BlueprintConnection `json:"connections" gorm:"-"`
+ SyncPolicy `gorm:"embedded"`
common.Model `swaggerignore:"true"`
}
diff --git a/backend/core/models/migrationscripts/20230907_add_full_sync.go
b/backend/core/models/migrationscripts/20230911_add_sync_policy.go
similarity index 59%
rename from backend/core/models/migrationscripts/20230907_add_full_sync.go
rename to backend/core/models/migrationscripts/20230911_add_sync_policy.go
index 293ac69a9..c9e91bf59 100644
--- a/backend/core/models/migrationscripts/20230907_add_full_sync.go
+++ b/backend/core/models/migrationscripts/20230911_add_sync_policy.go
@@ -18,41 +18,46 @@ limitations under the License.
package migrationscripts
import (
+ "time"
+
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/helpers/migrationhelper"
)
-type addFullSyncToBlueprint struct {
- FullSync bool `json:"fullSync"`
+type addSyncPolicyToBlueprint struct {
+ FullSync bool `json:"fullSync"`
+ SkipCollectors bool `json:"skipCollectors"`
}
-func (*addFullSyncToBlueprint) TableName() string {
+func (*addSyncPolicyToBlueprint) TableName() string {
return "_devlake_blueprints"
}
-type addFullSyncToPipeline struct {
- FullSync bool `json:"fullSync"`
+type addSyncPolicyToPipeline struct {
+ FullSync bool `json:"fullSync"`
+ SkipCollectors bool `json:"skipCollectors"`
+ TimeAfter *time.Time `json:"timeAfter"`
}
-func (*addFullSyncToPipeline) TableName() string {
+func (*addSyncPolicyToPipeline) TableName() string {
return "_devlake_pipelines"
}
-type addFullSync struct{}
+type addSyncPolicy struct{}
-func (*addFullSync) Up(basicRes context.BasicRes) errors.Error {
+func (*addSyncPolicy) Up(basicRes context.BasicRes) errors.Error {
return migrationhelper.AutoMigrateTables(
basicRes,
- &addFullSyncToBlueprint{},
- &addFullSyncToPipeline{},
+ &addSyncPolicyToBlueprint{},
+ &addSyncPolicyToPipeline{},
)
}
-func (*addFullSync) Version() uint64 {
- return 20230907000041
+func (*addSyncPolicy) Version() uint64 {
+ return 20230911000041
}
-func (*addFullSync) Name() string {
- return "add full_sync to _devlake_blueprints and _devlake_pipelines
table"
+func (*addSyncPolicy) Name() string {
+ return "add sync policy to _devlake_blueprints and _devlake_pipelines
table"
}
diff --git a/backend/core/models/migrationscripts/register.go
b/backend/core/models/migrationscripts/register.go
index 7c86af0d4..3874e9d91 100644
--- a/backend/core/models/migrationscripts/register.go
+++ b/backend/core/models/migrationscripts/register.go
@@ -93,6 +93,6 @@ func All() []plugin.MigrationScript {
new(dropTapStateTable),
new(addCICDDeploymentsTable),
new(normalizeBpSettings),
- new(addFullSync),
+ new(addSyncPolicy),
}
}
diff --git a/backend/core/models/pipeline.go b/backend/core/models/pipeline.go
index 1bbb21086..0add6c66f 100644
--- a/backend/core/models/pipeline.go
+++ b/backend/core/models/pipeline.go
@@ -69,8 +69,7 @@ type Pipeline struct {
SpentSeconds int `json:"spentSeconds"`
Stage int `json:"stage"`
Labels []string `json:"labels" gorm:"-"`
- SkipOnFail bool `json:"skipOnFail"`
- FullSync bool `json:"fullSync"`
+ SyncPolicy `gorm:"embedded"`
}
// We use a 2D array because the request body must be an array of a set of
tasks
@@ -79,9 +78,8 @@ type NewPipeline struct {
Name string `json:"name"`
Plan PipelinePlan `json:"plan" swaggertype:"array,string"
example:"please check api /pipelines/<PLUGIN_NAME>/pipeline-plan"`
Labels []string `json:"labels"`
- SkipOnFail bool `json:"skipOnFail"`
- FullSync bool `json:"fullSync"`
BlueprintId uint64
+ SyncPolicy
}
func (Pipeline) TableName() string {
diff --git a/backend/core/plugin/plugin_blueprint.go
b/backend/core/plugin/plugin_blueprint.go
index db3412540..492e54d43 100644
--- a/backend/core/plugin/plugin_blueprint.go
+++ b/backend/core/plugin/plugin_blueprint.go
@@ -72,7 +72,6 @@ type DataSourcePluginBlueprintV200 interface {
MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*models.BlueprintScope,
- syncPolicy *models.SyncPolicy,
) (models.PipelinePlan, []Scope, errors.Error)
}
diff --git a/backend/core/runner/run_task.go b/backend/core/runner/run_task.go
index 009e4f951..96cb18cc3 100644
--- a/backend/core/runner/run_task.go
+++ b/backend/core/runner/run_task.go
@@ -54,8 +54,16 @@ func RunTask(
return err
}
syncPolicy := &models.SyncPolicy{}
- syncPolicy.SkipOnFail = dbPipeline.SkipOnFail
- syncPolicy.FullSync = dbPipeline.FullSync
+ blueprint := &models.Blueprint{}
+ if dbPipeline.BlueprintId != 0 {
+ if err := db.First(blueprint, dal.Where("id = ? ",
dbPipeline.BlueprintId)); err != nil {
+ return err
+ }
+ syncPolicy = &blueprint.SyncPolicy
+ }
+ if dbPipeline.FullSync {
+ syncPolicy.FullSync = true
+ }
logger, err := getTaskLogger(basicRes.GetLogger(), task)
if err != nil {
diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index 43f48f689..e55d1ca6b 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -37,12 +37,11 @@ type ApiCollectorStateManager struct {
// *GraphqlCollector
subtasks []plugin.SubTask
LatestState models.CollectorLatestState
- TimeAfter *time.Time
ExecuteStart time.Time
}
// NewStatefulApiCollector create a new ApiCollectorStateManager
-func NewStatefulApiCollector(args RawDataSubTaskArgs, timeAfter *time.Time)
(*ApiCollectorStateManager, errors.Error) {
+func NewStatefulApiCollector(args RawDataSubTaskArgs)
(*ApiCollectorStateManager, errors.Error) {
db := args.Ctx.GetDal()
rawDataSubTask, err := NewRawDataSubTask(args)
@@ -64,7 +63,6 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs,
timeAfter *time.Time) (*Ap
return &ApiCollectorStateManager{
RawDataSubTaskArgs: args,
LatestState: latestState,
- TimeAfter: timeAfter,
ExecuteStart: time.Now(),
}, nil
}
@@ -73,16 +71,16 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs,
timeAfter *time.Time) (*Ap
func (m *ApiCollectorStateManager) IsIncremental() bool {
prevSyncTime := m.LatestState.LatestSuccessStart
prevTimeAfter := m.LatestState.TimeAfter
- currTimeAfter := m.TimeAfter
syncPolicy := m.Ctx.TaskContext().SyncPolicy()
-
if syncPolicy != nil && syncPolicy.FullSync {
return false
}
+
if prevSyncTime == nil {
return false
}
// if we cleared the timeAfter, or moved timeAfter back in time, we
should do a full sync
+ currTimeAfter := syncPolicy.TimeAfter
if currTimeAfter != nil {
return prevTimeAfter == nil ||
!currTimeAfter.Before(*prevTimeAfter)
}
@@ -122,7 +120,11 @@ func (m *ApiCollectorStateManager) Execute() errors.Error {
db := m.Ctx.GetDal()
m.LatestState.LatestSuccessStart = &m.ExecuteStart
- m.LatestState.TimeAfter = m.TimeAfter
+ syncPolicy := m.Ctx.TaskContext().SyncPolicy()
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ m.LatestState.TimeAfter = syncPolicy.TimeAfter
+ }
+
return db.CreateOrUpdate(&m.LatestState)
}
@@ -154,18 +156,19 @@ func NewStatefulApiCollectorForFinalizableEntity(args
FinalizableApiCollectorArg
Options: args.Options,
Params: args.Params,
Table: args.Table,
- }, args.TimeAfter)
+ })
if err != nil {
return nil, err
}
// // prepare the basic variables
+ syncPolicy := manager.Ctx.TaskContext().SyncPolicy()
var isIncremental = manager.IsIncremental()
var createdAfter *time.Time
if isIncremental {
createdAfter = manager.LatestState.LatestSuccessStart
- } else {
- createdAfter = manager.TimeAfter
+ } else if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ createdAfter = syncPolicy.TimeAfter
}
// step 1: create a collector to collect newly added records
diff --git a/backend/plugins/bamboo/api/blueprint_V200_test.go
b/backend/plugins/bamboo/api/blueprint_V200_test.go
index 5fb5c8a86..bd3d1abe3 100644
--- a/backend/plugins/bamboo/api/blueprint_V200_test.go
+++ b/backend/plugins/bamboo/api/blueprint_V200_test.go
@@ -48,7 +48,6 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
const testScopeConfigName string = "bamboo scope config"
const testProxy string = ""
- syncPolicy := &coreModels.SyncPolicy{}
bpScopes := []*coreModels.BlueprintScope{
{
ScopeId: testKey,
@@ -148,7 +147,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
})
Init(basicRes, mockMeta)
- plans, scopes, err := MakePipelinePlanV200(testSubTaskMeta,
testConnectionID, bpScopes, syncPolicy)
+ plans, scopes, err := MakePipelinePlanV200(testSubTaskMeta,
testConnectionID, bpScopes)
assert.Equal(t, err, nil)
assert.Equal(t, expectPlans, plans)
diff --git a/backend/plugins/bamboo/api/blueprint_v200.go
b/backend/plugins/bamboo/api/blueprint_v200.go
index bbd2a3ae8..1b252a2eb 100644
--- a/backend/plugins/bamboo/api/blueprint_v200.go
+++ b/backend/plugins/bamboo/api/blueprint_v200.go
@@ -36,7 +36,6 @@ func MakePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
scope []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
var err errors.Error
connection := new(models.BambooConnection)
@@ -50,7 +49,7 @@ func MakePipelinePlanV200(
return nil, nil, err
}
- pp, err := makePipelinePlanV200(subtaskMetas, scope, connection,
syncPolicy)
+ pp, err := makePipelinePlanV200(subtaskMetas, scope, connection)
if err != nil {
return nil, nil, err
}
@@ -84,7 +83,7 @@ func makeScopeV200(connectionId uint64, scopes
[]*coreModels.BlueprintScope) ([]
func makePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
scopes []*coreModels.BlueprintScope,
- connection *models.BambooConnection, syncPolicy *coreModels.SyncPolicy,
+ connection *models.BambooConnection,
) (coreModels.PipelinePlan, errors.Error) {
plans := make(coreModels.PipelinePlan, 0, len(scopes))
for _, scope := range scopes {
diff --git a/backend/plugins/bamboo/impl/impl.go
b/backend/plugins/bamboo/impl/impl.go
index 69d77b3f4..529089a1e 100644
--- a/backend/plugins/bamboo/impl/impl.go
+++ b/backend/plugins/bamboo/impl/impl.go
@@ -68,9 +68,8 @@ func (p Bamboo) ScopeConfig() dal.Tabler {
func (p Bamboo) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
- return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes,
syncPolicy)
+ return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes)
}
func (p Bamboo) GetTablesInfo() []dal.Tabler {
diff --git a/backend/plugins/bitbucket/api/blueprint_V200_test.go
b/backend/plugins/bitbucket/api/blueprint_V200_test.go
index 0a878c2e1..eb9d3534f 100644
--- a/backend/plugins/bitbucket/api/blueprint_V200_test.go
+++ b/backend/plugins/bitbucket/api/blueprint_V200_test.go
@@ -68,10 +68,9 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
}
bpScopes := make([]*coreModels.BlueprintScope, 0)
bpScopes = append(bpScopes, bs)
- syncPolicy := &coreModels.SyncPolicy{}
plan := make(coreModels.PipelinePlan, len(bpScopes))
- plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes,
connection, syncPolicy)
+ plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes,
connection)
assert.Nil(t, err)
scopes, err := makeScopesV200(bpScopes, connection)
assert.Nil(t, err)
diff --git a/backend/plugins/bitbucket/api/blueprint_v200.go
b/backend/plugins/bitbucket/api/blueprint_v200.go
index e438a4a68..d1b562103 100644
--- a/backend/plugins/bitbucket/api/blueprint_v200.go
+++ b/backend/plugins/bitbucket/api/blueprint_v200.go
@@ -19,7 +19,6 @@ package api
import (
"net/url"
- "time"
"github.com/apache/incubator-devlake/core/errors"
coreModels "github.com/apache/incubator-devlake/core/models"
@@ -39,7 +38,6 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
// get the connection info for url
connection := &models.BitbucketConnection{}
@@ -49,7 +47,7 @@ func MakeDataSourcePipelinePlanV200(
}
plan := make(coreModels.PipelinePlan, len(bpScopes))
- plan, err = makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connection, syncPolicy)
+ plan, err = makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connection)
if err != nil {
return nil, nil, err
}
@@ -66,7 +64,6 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connection *models.BitbucketConnection,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
stage := plan[i]
@@ -101,9 +98,6 @@ func makeDataSourcePipelinePlanV200(
ConnectionId: repo.ConnectionId,
FullName: repo.BitbucketId,
}
- if syncPolicy.TimeAfter != nil {
- op.TimeAfter = syncPolicy.TimeAfter.Format(time.RFC3339)
- }
options, err := tasks.EncodeTaskOptions(op)
if err != nil {
return nil, err
diff --git a/backend/plugins/bitbucket/bitbucket.go
b/backend/plugins/bitbucket/bitbucket.go
index fa36b7ad3..6c73f06cf 100644
--- a/backend/plugins/bitbucket/bitbucket.go
+++ b/backend/plugins/bitbucket/bitbucket.go
@@ -31,7 +31,6 @@ func main() {
cmd := &cobra.Command{Use: "bitbucket"}
connectionId := cmd.Flags().Uint64P("connectionId", "c", 0, "bitbucket
connection id")
fullName := cmd.Flags().StringP("fullName", "n", "", "bitbucket id:
owner/repo")
- timeAfter := cmd.Flags().StringP("timeAfter", "a", "", "collect data
that are created after specified time, ie 2006-05-06T07:08:09Z")
deploymentPattern := cmd.Flags().StringP("deployment", "", "",
"deployment pattern")
productionPattern := cmd.Flags().StringP("production", "", "",
"production pattern")
_ = cmd.MarkFlagRequired("connectionId")
@@ -41,7 +40,6 @@ func main() {
runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
"connectionId": *connectionId,
"fullName": *fullName,
- "timeAfter": *timeAfter,
"scopeConfig": map[string]string{
"deploymentPattern": *deploymentPattern,
"productionPattern": *productionPattern,
diff --git a/backend/plugins/bitbucket/impl/impl.go
b/backend/plugins/bitbucket/impl/impl.go
index 275e97377..57c243418 100644
--- a/backend/plugins/bitbucket/impl/impl.go
+++ b/backend/plugins/bitbucket/impl/impl.go
@@ -19,7 +19,6 @@ package impl
import (
"fmt"
- "time"
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/dal"
@@ -164,13 +163,6 @@ func (p Bitbucket) PrepareTaskData(taskCtx
plugin.TaskContext, options map[strin
return nil, err
}
- var timeAfter time.Time
- if op.TimeAfter != "" {
- timeAfter, err = errors.Convert01(time.Parse(time.RFC3339,
op.TimeAfter))
- if err != nil {
- return nil, errors.BadInput.Wrap(err, "invalid value
for `timeAfter`")
- }
- }
regexEnricher := helper.NewRegexEnricher()
if err := regexEnricher.TryAdd(devops.DEPLOYMENT,
op.DeploymentPattern); err != nil {
return nil, errors.BadInput.Wrap(err, "invalid value for
`deploymentPattern`")
@@ -183,10 +175,6 @@ func (p Bitbucket) PrepareTaskData(taskCtx
plugin.TaskContext, options map[strin
ApiClient: apiClient,
RegexEnricher: regexEnricher,
}
- if !timeAfter.IsZero() {
- taskData.TimeAfter = &timeAfter
- logger.Debug("collect data updated timeAfter %s", timeAfter)
- }
return taskData, nil
}
@@ -201,9 +189,8 @@ func (p Bitbucket) MigrationScripts()
[]plugin.MigrationScript {
func (p Bitbucket) MakeDataSourcePipelinePlanV200(
connectionId uint64,
- scopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy) (pp coreModels.PipelinePlan, sc
[]plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
+ scopes []*coreModels.BlueprintScope) (pp coreModels.PipelinePlan, sc
[]plugin.Scope, err errors.Error) {
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes)
}
func (p Bitbucket) ApiResources()
map[string]map[string]plugin.ApiResourceHandler {
diff --git a/backend/plugins/bitbucket/tasks/api_common.go
b/backend/plugins/bitbucket/tasks/api_common.go
index 5aa43a0fe..0910ff82e 100644
--- a/backend/plugins/bitbucket/tasks/api_common.go
+++ b/backend/plugins/bitbucket/tasks/api_common.go
@@ -20,15 +20,16 @@ package tasks
import (
"encoding/json"
"fmt"
- "github.com/apache/incubator-devlake/core/dal"
- "github.com/apache/incubator-devlake/core/errors"
- plugin "github.com/apache/incubator-devlake/core/plugin"
- "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"io"
"net/http"
"net/url"
"reflect"
"time"
+
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/core/errors"
+ plugin "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
)
type BitbucketApiParams struct {
@@ -100,11 +101,12 @@ func GetQueryCreatedAndUpdated(fields string,
collectorWithState *api.ApiCollect
}
query.Set("fields", fields)
query.Set("sort", "created_on")
+ syncPolicy := collectorWithState.Ctx.TaskContext().SyncPolicy()
if collectorWithState.IsIncremental() {
latestSuccessStart :=
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339)
query.Set("q", fmt.Sprintf("updated_on>=%s",
latestSuccessStart))
- } else if collectorWithState.TimeAfter != nil {
- timeAfter :=
collectorWithState.TimeAfter.Format(time.RFC3339)
+ } else if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ timeAfter := syncPolicy.TimeAfter.Format(time.RFC3339)
query.Set("q", fmt.Sprintf("updated_on>=%s", timeAfter))
}
diff --git a/backend/plugins/bitbucket/tasks/issue_collector.go
b/backend/plugins/bitbucket/tasks/issue_collector.go
index ef5f56fd5..a5f2e4a41 100644
--- a/backend/plugins/bitbucket/tasks/issue_collector.go
+++ b/backend/plugins/bitbucket/tasks/issue_collector.go
@@ -35,7 +35,7 @@ var CollectApiIssuesMeta = plugin.SubTaskMeta{
func CollectApiIssues(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_ISSUE_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
diff --git a/backend/plugins/bitbucket/tasks/issue_comment_collector.go
b/backend/plugins/bitbucket/tasks/issue_comment_collector.go
index 8f85ceace..8e0036c4a 100644
--- a/backend/plugins/bitbucket/tasks/issue_comment_collector.go
+++ b/backend/plugins/bitbucket/tasks/issue_comment_collector.go
@@ -36,7 +36,7 @@ var CollectApiIssueCommentsMeta = plugin.SubTaskMeta{
func CollectApiIssueComments(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_ISSUE_COMMENTS_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
diff --git a/backend/plugins/bitbucket/tasks/pipeline_collector.go
b/backend/plugins/bitbucket/tasks/pipeline_collector.go
index 58a5f6a4c..e25861936 100644
--- a/backend/plugins/bitbucket/tasks/pipeline_collector.go
+++ b/backend/plugins/bitbucket/tasks/pipeline_collector.go
@@ -35,7 +35,7 @@ var CollectApiPipelinesMeta = plugin.SubTaskMeta{
func CollectApiPipelines(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_PIPELINE_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
diff --git a/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
b/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
index b03fdfefe..99cc1e39b 100644
--- a/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
+++ b/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
@@ -38,7 +38,7 @@ var CollectPipelineStepsMeta = plugin.SubTaskMeta{
func CollectPipelineSteps(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_PIPELINE_STEPS_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
diff --git a/backend/plugins/bitbucket/tasks/pr_collector.go
b/backend/plugins/bitbucket/tasks/pr_collector.go
index aab2c6b13..7379cc1e4 100644
--- a/backend/plugins/bitbucket/tasks/pr_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_collector.go
@@ -38,7 +38,7 @@ var CollectApiPullRequestsMeta = plugin.SubTaskMeta{
func CollectApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_PULL_REQUEST_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
diff --git a/backend/plugins/bitbucket/tasks/pr_comment_collector.go
b/backend/plugins/bitbucket/tasks/pr_comment_collector.go
index 70bd44433..c4d539f85 100644
--- a/backend/plugins/bitbucket/tasks/pr_comment_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_comment_collector.go
@@ -36,7 +36,7 @@ var CollectApiPrCommentsMeta = plugin.SubTaskMeta{
func CollectApiPullRequestsComments(taskCtx plugin.SubTaskContext)
errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_PULL_REQUEST_COMMENTS_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
diff --git a/backend/plugins/bitbucket/tasks/pr_commit_collector.go
b/backend/plugins/bitbucket/tasks/pr_commit_collector.go
index 14c53d5d2..9d99da39c 100644
--- a/backend/plugins/bitbucket/tasks/pr_commit_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_commit_collector.go
@@ -19,10 +19,11 @@ package tasks
import (
"fmt"
+ "net/url"
+
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
- "net/url"
)
const RAW_PULL_REQUEST_COMMITS_TABLE = "bitbucket_api_pull_request_commits"
@@ -37,7 +38,7 @@ var CollectApiPrCommitsMeta = plugin.SubTaskMeta{
func CollectApiPullRequestCommits(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_PULL_REQUEST_COMMITS_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
diff --git a/backend/plugins/bitbucket/tasks/task_data.go
b/backend/plugins/bitbucket/tasks/task_data.go
index 6ecb87689..235b05080 100644
--- a/backend/plugins/bitbucket/tasks/task_data.go
+++ b/backend/plugins/bitbucket/tasks/task_data.go
@@ -18,8 +18,6 @@ limitations under the License.
package tasks
import (
- "time"
-
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/bitbucket/models"
@@ -29,7 +27,6 @@ type BitbucketOptions struct {
ConnectionId uint64 `json:"connectionId"
mapstructure:"connectionId,omitempty"`
Tasks []string `json:"tasks,omitempty"
mapstructure:",omitempty"`
FullName string `json:"fullName"
mapstructure:"fullName"`
- TimeAfter string `json:"timeAfter"
mapstructure:"timeAfter,omitempty"`
ScopeConfigId uint64 `json:"scopeConfigId"
mapstructure:"scopeConfigId,omitempty"`
*models.BitbucketScopeConfig `mapstructure:"scopeConfig,omitempty"
json:"scopeConfig"`
}
@@ -37,7 +34,6 @@ type BitbucketOptions struct {
type BitbucketTaskData struct {
Options *BitbucketOptions
ApiClient *api.ApiAsyncClient
- TimeAfter *time.Time
RegexEnricher *api.RegexEnricher
}
diff --git a/backend/plugins/circleci/api/blueprint200.go
b/backend/plugins/circleci/api/blueprint200.go
index 9f5a8d55b..57bb1e6ed 100644
--- a/backend/plugins/circleci/api/blueprint200.go
+++ b/backend/plugins/circleci/api/blueprint200.go
@@ -19,7 +19,6 @@ package api
import (
"fmt"
- "time"
"github.com/apache/incubator-devlake/core/models/domainlayer/devops"
"github.com/apache/incubator-devlake/plugins/circleci/models"
@@ -38,10 +37,9 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
plan := make(coreModels.PipelinePlan, len(bpScopes))
- plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId, syncPolicy)
+ plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId)
if err != nil {
return nil, nil, err
}
@@ -58,7 +56,6 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connectionId uint64,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
stage := plan[i]
@@ -73,9 +70,6 @@ func makeDataSourcePipelinePlanV200(
}
options["projectSlug"] = project.Slug
options["connectionId"] = connectionId
- if syncPolicy.TimeAfter != nil {
- options["timeAfter"] =
syncPolicy.TimeAfter.Format(time.RFC3339)
- }
// get scope config from db
_, scopeConfig, err :=
scopeHelper.DbHelper().GetScopeAndConfig(connectionId, bpScope.ScopeId)
diff --git a/backend/plugins/circleci/impl/impl.go
b/backend/plugins/circleci/impl/impl.go
index 6d7a97601..df9d9605b 100644
--- a/backend/plugins/circleci/impl/impl.go
+++ b/backend/plugins/circleci/impl/impl.go
@@ -19,7 +19,6 @@ package impl
import (
"fmt"
- "time"
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/dal"
@@ -90,9 +89,8 @@ func (p Circleci) SubTaskMetas() []plugin.SubTaskMeta {
func (p Circleci) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes)
}
func (p Circleci) PrepareTaskData(taskCtx plugin.TaskContext, options
map[string]interface{}) (interface{}, errors.Error) {
@@ -119,16 +117,6 @@ func (p Circleci) PrepareTaskData(taskCtx
plugin.TaskContext, options map[string
Options: op,
ApiClient: apiClient,
}
- var createdDateAfter time.Time
- if op.TimeAfter != "" {
- createdDateAfter, err =
errors.Convert01(time.Parse(time.RFC3339, op.TimeAfter))
- if err != nil {
- return nil, errors.BadInput.Wrap(err, "invalid value
for `createdDateAfter`")
- }
- }
- if !createdDateAfter.IsZero() {
- taskData.TimeAfter = &createdDateAfter
- }
// fallback to project scope config
if op.ScopeConfigId == 0 {
project := &models.CircleciProject{}
diff --git a/backend/plugins/circleci/tasks/task_data.go
b/backend/plugins/circleci/tasks/task_data.go
index 6f5e3daf1..5c3028694 100644
--- a/backend/plugins/circleci/tasks/task_data.go
+++ b/backend/plugins/circleci/tasks/task_data.go
@@ -18,8 +18,6 @@ limitations under the License.
package tasks
import (
- "time"
-
"github.com/apache/incubator-devlake/core/errors"
helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/circleci/models"
@@ -29,7 +27,6 @@ type CircleciOptions struct {
ConnectionId uint64 `json:"connectionId"`
ProjectSlug string `json:"projectSlug"`
PageSize uint64 `mapstruct:"pageSize"`
- TimeAfter string `json:"timeAfter"
mapstructure:"timeAfter,omitempty"`
ScopeConfigId uint64 `json:"scopeConfigId"
mapstructure:"scopeConfigId,omitempty"`
ScopeConfig *models.CircleciScopeConfig `json:"scopeConfig"
mapstructure:"scopeConfig,omitempty"`
}
@@ -37,7 +34,6 @@ type CircleciOptions struct {
type CircleciTaskData struct {
Options *CircleciOptions
ApiClient *helper.ApiAsyncClient
- TimeAfter *time.Time
RegexEnricher *helper.RegexEnricher
}
diff --git a/backend/plugins/github/api/blueprint_V200_test.go
b/backend/plugins/github/api/blueprint_V200_test.go
index 718bcb5b2..96c4c5ce0 100644
--- a/backend/plugins/github/api/blueprint_V200_test.go
+++ b/backend/plugins/github/api/blueprint_V200_test.go
@@ -69,10 +69,9 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
}
bpScopes := make([]*coreModels.BlueprintScope, 0)
bpScopes = append(bpScopes, bs)
- syncPolicy := &coreModels.SyncPolicy{}
plan := make(coreModels.PipelinePlan, len(bpScopes))
- plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes,
connection, syncPolicy)
+ plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes,
connection)
assert.Nil(t, err)
scopes, err := makeScopesV200(bpScopes, connection)
assert.Nil(t, err)
diff --git a/backend/plugins/github/api/blueprint_v200.go
b/backend/plugins/github/api/blueprint_v200.go
index e405e52c3..b6b508930 100644
--- a/backend/plugins/github/api/blueprint_v200.go
+++ b/backend/plugins/github/api/blueprint_v200.go
@@ -25,7 +25,6 @@ import (
"net/http"
"net/url"
"strings"
- "time"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
@@ -47,7 +46,6 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
// get the connection info for url
connection := &models.GithubConnection{}
@@ -64,7 +62,7 @@ func MakeDataSourcePipelinePlanV200(
}
plan := make(coreModels.PipelinePlan, len(bpScopes))
- plan, err = makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connection, syncPolicy)
+ plan, err = makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connection)
if err != nil {
return nil, nil, err
}
@@ -81,7 +79,6 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connection *models.GithubConnection,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
stage := plan[i]
@@ -117,9 +114,6 @@ func makeDataSourcePipelinePlanV200(
GithubId: githubRepo.GithubId,
Name: githubRepo.FullName,
}
- if syncPolicy.TimeAfter != nil {
- op.TimeAfter = syncPolicy.TimeAfter.Format(time.RFC3339)
- }
options, err := tasks.EncodeTaskOptions(op)
if err != nil {
return nil, err
diff --git a/backend/plugins/github/github.go b/backend/plugins/github/github.go
index 5dd7f8c03..de811aadf 100644
--- a/backend/plugins/github/github.go
+++ b/backend/plugins/github/github.go
@@ -32,7 +32,6 @@ func main() {
connectionId := cmd.Flags().Uint64P("connectionId", "c", 0, "github
connection id")
owner := cmd.Flags().StringP("owner", "o", "", "github owner")
repo := cmd.Flags().StringP("repo", "r", "", "github repo")
- timeAfter := cmd.Flags().StringP("timeAfter", "a", "", "collect data
that are created after specified time, ie 2006-05-06T07:08:09Z")
_ = cmd.MarkFlagRequired("connectionId")
_ = cmd.MarkFlagRequired("owner")
_ = cmd.MarkFlagRequired("repo")
@@ -54,7 +53,6 @@ func main() {
"connectionId": *connectionId,
"owner": *owner,
"repo": *repo,
- "timeAfter": *timeAfter,
"scopeConfig": map[string]interface{}{
"prType": *prType,
"prComponent": *prComponent,
diff --git a/backend/plugins/github/impl/impl.go
b/backend/plugins/github/impl/impl.go
index 12481fe0e..2a2d0d4ed 100644
--- a/backend/plugins/github/impl/impl.go
+++ b/backend/plugins/github/impl/impl.go
@@ -19,7 +19,6 @@ package impl
import (
"fmt"
- "time"
"github.com/apache/incubator-devlake/helpers/pluginhelper/subtaskmeta/sorter"
@@ -159,15 +158,6 @@ func (p Github) PrepareTaskData(taskCtx
plugin.TaskContext, options map[string]i
RegexEnricher: regexEnricher,
}
- if op.TimeAfter != "" {
- var timeAfter time.Time
- timeAfter, err = errors.Convert01(time.Parse(time.RFC3339,
op.TimeAfter))
- if err != nil {
- return nil, errors.BadInput.Wrap(err, "invalid value
for `timeAfter`")
- }
- taskData.TimeAfter = &timeAfter
- logger.Debug("collect data updated timeAfter %s", timeAfter)
- }
return taskData, nil
}
@@ -226,9 +216,8 @@ func (p Github) ApiResources()
map[string]map[string]plugin.ApiResourceHandler {
func (p Github) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes)
}
func (p Github) Close(taskCtx plugin.TaskContext) errors.Error {
diff --git a/backend/plugins/github/tasks/cicd_job_collector.go
b/backend/plugins/github/tasks/cicd_job_collector.go
index 787476c31..73fa2ab0d 100644
--- a/backend/plugins/github/tasks/cicd_job_collector.go
+++ b/backend/plugins/github/tasks/cicd_job_collector.go
@@ -59,7 +59,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
Name: data.Options.Name,
},
Table: RAW_JOB_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
diff --git a/backend/plugins/github/tasks/cicd_run_collector.go
b/backend/plugins/github/tasks/cicd_run_collector.go
index 4d9158fcf..f29958575 100644
--- a/backend/plugins/github/tasks/cicd_run_collector.go
+++ b/backend/plugins/github/tasks/cicd_run_collector.go
@@ -78,7 +78,6 @@ func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error {
Table: RAW_RUN_TABLE,
},
ApiClient: data.ApiClient,
- TimeAfter: data.TimeAfter,
CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{
PageSize: PAGE_SIZE,
Concurrency: 10,
diff --git a/backend/plugins/github/tasks/comment_collector.go
b/backend/plugins/github/tasks/comment_collector.go
index 402bfa4e8..21e57f853 100644
--- a/backend/plugins/github/tasks/comment_collector.go
+++ b/backend/plugins/github/tasks/comment_collector.go
@@ -53,12 +53,13 @@ func CollectApiComments(taskCtx plugin.SubTaskContext)
errors.Error {
Name: data.Options.Name,
},
Table: RAW_COMMENTS_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
incremental := collectorWithState.IsIncremental()
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
@@ -68,11 +69,11 @@ func CollectApiComments(taskCtx plugin.SubTaskContext)
errors.Error {
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- if data.TimeAfter != nil {
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
// Note that `since` is for filtering records
by the `updated` time
// which is not ideal for semantic reasons and
would result in slightly more records than expected.
// But we have no choice since it is the only
available field we could exploit from the API.
- query.Set("since", data.TimeAfter.String())
+ query.Set("since",
syncPolicy.TimeAfter.String())
}
// if incremental == true, we overwrite it
if incremental {
diff --git a/backend/plugins/github/tasks/commit_collector.go
b/backend/plugins/github/tasks/commit_collector.go
index 33e005331..c8ed10780 100644
--- a/backend/plugins/github/tasks/commit_collector.go
+++ b/backend/plugins/github/tasks/commit_collector.go
@@ -53,12 +53,13 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext)
errors.Error {
Name: data.Options.Name,
},
Table: RAW_COMMIT_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
incremental := collectorWithState.IsIncremental()
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
@@ -79,8 +80,8 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext)
errors.Error {
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- if data.TimeAfter != nil {
- query.Set("since", data.TimeAfter.String())
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ query.Set("since",
syncPolicy.TimeAfter.String())
}
if incremental {
query.Set("since",
collectorWithState.LatestState.LatestSuccessStart.String())
diff --git a/backend/plugins/github/tasks/event_collector.go
b/backend/plugins/github/tasks/event_collector.go
index 7652f068f..491cc021c 100644
--- a/backend/plugins/github/tasks/event_collector.go
+++ b/backend/plugins/github/tasks/event_collector.go
@@ -69,7 +69,6 @@ func CollectApiEvents(taskCtx plugin.SubTaskContext)
errors.Error {
Table: RAW_EVENTS_TABLE,
},
ApiClient: data.ApiClient,
- TimeAfter: data.TimeAfter, // set to nil to disable timeFilter
CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{
PageSize: 100,
Concurrency: 10,
diff --git a/backend/plugins/github/tasks/issue_collector.go
b/backend/plugins/github/tasks/issue_collector.go
index f88942319..551081c2d 100644
--- a/backend/plugins/github/tasks/issue_collector.go
+++ b/backend/plugins/github/tasks/issue_collector.go
@@ -53,12 +53,13 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
Name: data.Options.Name,
},
Table: RAW_ISSUE_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
incremental := collectorWithState.IsIncremental()
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
@@ -79,8 +80,8 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- if data.TimeAfter != nil {
- query.Set("since", data.TimeAfter.String())
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ query.Set("since",
syncPolicy.TimeAfter.String())
}
if incremental {
query.Set("since",
collectorWithState.LatestState.LatestSuccessStart.String())
diff --git a/backend/plugins/github/tasks/pr_collector.go
b/backend/plugins/github/tasks/pr_collector.go
index f94e96539..a83cd3085 100644
--- a/backend/plugins/github/tasks/pr_collector.go
+++ b/backend/plugins/github/tasks/pr_collector.go
@@ -72,7 +72,6 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext)
errors.Error {
Table: RAW_PULL_REQUEST_TABLE,
},
ApiClient: data.ApiClient,
- TimeAfter: data.TimeAfter, // set to nil to disable timeFilter
CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{
PageSize: 100,
Concurrency: 10,
diff --git a/backend/plugins/github/tasks/pr_commit_collector.go
b/backend/plugins/github/tasks/pr_commit_collector.go
index 30ac310e8..0a209ff2e 100644
--- a/backend/plugins/github/tasks/pr_commit_collector.go
+++ b/backend/plugins/github/tasks/pr_commit_collector.go
@@ -68,7 +68,7 @@ func CollectApiPullRequestCommits(taskCtx
plugin.SubTaskContext) errors.Error {
Name: data.Options.Name,
},
Table: RAW_PR_COMMIT_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
diff --git a/backend/plugins/github/tasks/pr_review_collector.go
b/backend/plugins/github/tasks/pr_review_collector.go
index 268f565ce..18bbde6b3 100644
--- a/backend/plugins/github/tasks/pr_review_collector.go
+++ b/backend/plugins/github/tasks/pr_review_collector.go
@@ -60,7 +60,7 @@ func CollectApiPullRequestReviews(taskCtx
plugin.SubTaskContext) errors.Error {
Name: data.Options.Name,
},
Table: RAW_PR_REVIEW_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
diff --git a/backend/plugins/github/tasks/pr_review_comment_collector.go
b/backend/plugins/github/tasks/pr_review_comment_collector.go
index 48661a335..bd03b032b 100644
--- a/backend/plugins/github/tasks/pr_review_comment_collector.go
+++ b/backend/plugins/github/tasks/pr_review_comment_collector.go
@@ -56,12 +56,13 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext)
errors.Error {
Name: data.Options.Name,
},
Table: RAW_PR_REVIEW_COMMENTS_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
incremental := collectorWithState.IsIncremental()
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
@@ -76,11 +77,11 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext)
errors.Error {
UrlTemplate: "repos/{{ .Params.Name }}/pulls/comments",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- if data.TimeAfter != nil {
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
// Note that `since` is for filtering records
by the `updated` time
// which is not ideal for semantic reasons and
would result in slightly more records than expected.
// But we have no choice since it is the only
available field we could exploit from the API.
- query.Set("since", data.TimeAfter.String())
+ query.Set("since",
syncPolicy.TimeAfter.String())
}
// if incremental == true, we overwrite it
if incremental {
diff --git a/backend/plugins/github/tasks/task_data.go
b/backend/plugins/github/tasks/task_data.go
index 8dcc777f8..714dbbd95 100644
--- a/backend/plugins/github/tasks/task_data.go
+++ b/backend/plugins/github/tasks/task_data.go
@@ -20,7 +20,6 @@ package tasks
import (
"fmt"
"strings"
- "time"
"github.com/apache/incubator-devlake/core/errors"
helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
@@ -31,7 +30,6 @@ type GithubOptions struct {
ConnectionId uint64 `json:"connectionId"
mapstructure:"connectionId,omitempty"`
ScopeConfigId uint64 `json:"scopeConfigId"
mapstructure:"scopeConfigId,omitempty"`
GithubId int `json:"githubId"
mapstructure:"githubId,omitempty"`
- TimeAfter string `json:"timeAfter"
mapstructure:"timeAfter,omitempty"`
Owner string `json:"owner"
mapstructure:"owner,omitempty"`
Repo string `json:"repo"
mapstructure:"repo,omitempty"`
Name string `json:"name"
mapstructure:"name,omitempty"`
@@ -42,7 +40,6 @@ type GithubTaskData struct {
Options *GithubOptions
ApiClient *helper.ApiAsyncClient
GraphqlClient *helper.GraphqlAsyncClient
- TimeAfter *time.Time
RegexEnricher *helper.RegexEnricher
}
diff --git a/backend/plugins/github_graphql/impl/impl.go
b/backend/plugins/github_graphql/impl/impl.go
index a1ef349a5..cf39093a3 100644
--- a/backend/plugins/github_graphql/impl/impl.go
+++ b/backend/plugins/github_graphql/impl/impl.go
@@ -232,15 +232,6 @@ func (p GithubGraphql) PrepareTaskData(taskCtx
plugin.TaskContext, options map[s
GraphqlClient: graphqlClient,
RegexEnricher: regexEnricher,
}
- if op.TimeAfter != "" {
- var timeAfter time.Time
- timeAfter, err = errors.Convert01(time.Parse(time.RFC3339,
op.TimeAfter))
- if err != nil {
- return nil, errors.BadInput.Wrap(err, "invalid value
for `timeAfter`")
- }
- taskData.TimeAfter = &timeAfter
- logger.Debug("collect data updated timeAfter %s", timeAfter)
- }
return taskData, nil
}
diff --git a/backend/plugins/github_graphql/plugin_main.go
b/backend/plugins/github_graphql/plugin_main.go
index e9edf721b..3f1777979 100644
--- a/backend/plugins/github_graphql/plugin_main.go
+++ b/backend/plugins/github_graphql/plugin_main.go
@@ -32,7 +32,6 @@ func main() {
connectionId := cmd.Flags().Uint64P("connectionId", "c", 0, "github
connection id")
owner := cmd.Flags().StringP("owner", "o", "", "github owner")
repo := cmd.Flags().StringP("repo", "r", "", "github repo")
- timeAfter := cmd.Flags().StringP("timeAfter", "a", "", "collect data
that are updated/created after specified time, ie 2006-05-06T07:08:09Z")
_ = cmd.MarkFlagRequired("connectionId")
_ = cmd.MarkFlagRequired("owner")
_ = cmd.MarkFlagRequired("repo")
@@ -42,7 +41,6 @@ func main() {
"connectionId": *connectionId,
"owner": *owner,
"repo": *repo,
- "timeAfter": *timeAfter,
})
}
runner.RunCmd(cmd)
diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go
b/backend/plugins/github_graphql/tasks/issue_collector.go
index 768822494..ece8351ba 100644
--- a/backend/plugins/github_graphql/tasks/issue_collector.go
+++ b/backend/plugins/github_graphql/tasks/issue_collector.go
@@ -104,13 +104,13 @@ func CollectIssue(taskCtx plugin.SubTaskContext)
errors.Error {
Name: data.Options.Name,
},
Table: RAW_ISSUES_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
incremental := collectorWithState.IsIncremental()
-
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
err =
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
GraphqlClient: data.GraphqlClient,
PageSize: 100,
@@ -123,8 +123,8 @@ func CollectIssue(taskCtx plugin.SubTaskContext)
errors.Error {
since := helper.DateTime{}
if incremental {
since = helper.DateTime{Time:
*collectorWithState.LatestState.LatestSuccessStart}
- } else if collectorWithState.TimeAfter != nil {
- since = helper.DateTime{Time:
*collectorWithState.TimeAfter}
+ } else if syncPolicy != nil && syncPolicy.TimeAfter !=
nil {
+ since = helper.DateTime{Time:
*syncPolicy.TimeAfter}
}
ownerName := strings.Split(data.Options.Name, "/")
variables := map[string]interface{}{
diff --git a/backend/plugins/github_graphql/tasks/job_collector.go
b/backend/plugins/github_graphql/tasks/job_collector.go
index 805a86c5e..095276c4a 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -111,7 +111,7 @@ func CollectGraphqlJobs(taskCtx plugin.SubTaskContext)
errors.Error {
Name: data.Options.Name,
},
Table: RAW_GRAPHQL_JOBS_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go
b/backend/plugins/github_graphql/tasks/pr_collector.go
index cc854c7f0..ae79c5a37 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -155,12 +155,13 @@ func CollectPr(taskCtx plugin.SubTaskContext)
errors.Error {
Name: data.Options.Name,
},
Table: RAW_PRS_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
incremental := collectorWithState.IsIncremental()
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitGraphQLCollector(api.GraphqlCollectorArgs{
GraphqlClient: data.GraphqlClient,
@@ -195,7 +196,7 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
isFinish := false
for _, rawL := range prs {
// collect data even though in increment mode
because of updating existing data
- if collectorWithState.TimeAfter != nil &&
!collectorWithState.TimeAfter.Before(rawL.UpdatedAt) {
+ if syncPolicy != nil && syncPolicy.TimeAfter !=
nil && !syncPolicy.TimeAfter.Before(rawL.UpdatedAt) {
isFinish = true
break
}
diff --git a/backend/plugins/gitlab/api/blueprint_V200_test.go
b/backend/plugins/gitlab/api/blueprint_V200_test.go
index 99bc27062..aa366b73b 100644
--- a/backend/plugins/gitlab/api/blueprint_V200_test.go
+++ b/backend/plugins/gitlab/api/blueprint_V200_test.go
@@ -52,7 +52,6 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
const testScopeConfigName string = "gitlab scope config"
const testProxy string = ""
- syncPolicy := &coreModels.SyncPolicy{}
bpScopes := []*coreModels.BlueprintScope{
{
ScopeId: strconv.Itoa(testID),
@@ -201,7 +200,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
})
Init(mockRes, mockMeta)
- plans, scopes, err := MakePipelinePlanV200(testSubTaskMeta,
testConnectionID, bpScopes, syncPolicy)
+ plans, scopes, err := MakePipelinePlanV200(testSubTaskMeta,
testConnectionID, bpScopes)
assert.Equal(t, err, nil)
assert.Equal(t, expectPlans, plans)
diff --git a/backend/plugins/gitlab/api/blueprint_v200.go
b/backend/plugins/gitlab/api/blueprint_v200.go
index d7197cb5d..7b6fba3cb 100644
--- a/backend/plugins/gitlab/api/blueprint_v200.go
+++ b/backend/plugins/gitlab/api/blueprint_v200.go
@@ -24,7 +24,6 @@ import (
"net/http"
"net/url"
"strconv"
- "time"
"github.com/apache/incubator-devlake/plugins/gitlab/tasks"
@@ -46,7 +45,6 @@ func MakePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
scope []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
var err errors.Error
connection := new(models.GitlabConnection)
@@ -60,7 +58,7 @@ func MakePipelinePlanV200(
return nil, nil, err
}
- pp, err := makePipelinePlanV200(subtaskMetas, scope, connection,
syncPolicy)
+ pp, err := makePipelinePlanV200(subtaskMetas, scope, connection)
if err != nil {
return nil, nil, err
}
@@ -114,7 +112,7 @@ func makeScopeV200(connectionId uint64, scopes
[]*coreModels.BlueprintScope) ([]
func makePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
scopes []*coreModels.BlueprintScope,
- connection *models.GitlabConnection, syncPolicy *coreModels.SyncPolicy,
+ connection *models.GitlabConnection,
) (coreModels.PipelinePlan, errors.Error) {
plans := make(coreModels.PipelinePlan, 0, 3*len(scopes))
for _, scope := range scopes {
@@ -137,9 +135,6 @@ func makePipelinePlanV200(
options["connectionId"] = connection.ID
options["projectId"] = intScopeId
options["scopeConfigId"] = scopeConfig.ID
- if syncPolicy.TimeAfter != nil {
- options["timeAfter"] =
syncPolicy.TimeAfter.Format(time.RFC3339)
- }
// construct subtasks
subtasks, err := helper.MakePipelinePlanSubtasks(subtaskMetas,
scopeConfig.Entities)
diff --git a/backend/plugins/gitlab/gitlab.go b/backend/plugins/gitlab/gitlab.go
index 5a6d2701c..86f3bac9d 100644
--- a/backend/plugins/gitlab/gitlab.go
+++ b/backend/plugins/gitlab/gitlab.go
@@ -31,7 +31,6 @@ func main() {
cmd := &cobra.Command{Use: "gitlab"}
projectId := cmd.Flags().IntP("project-id", "p", 0, "gitlab project id")
connectionId := cmd.Flags().Uint64P("connection-id", "c", 0, "gitlab
connection id")
- timeAfter := cmd.Flags().StringP("timeAfter", "a", "", "collect data
that are created after specified time, ie 2006-05-06T07:08:09Z")
_ = cmd.MarkFlagRequired("project-id")
_ = cmd.MarkFlagRequired("connection-id")
@@ -50,7 +49,6 @@ func main() {
runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
"projectId": *projectId,
"connectionId": *connectionId,
- "timeAfter": *timeAfter,
"scopeConfig": map[string]interface{}{
"prType": *prType,
"prComponent": *prComponent,
diff --git a/backend/plugins/gitlab/impl/impl.go
b/backend/plugins/gitlab/impl/impl.go
index 36c7c2ac2..1233802cd 100644
--- a/backend/plugins/gitlab/impl/impl.go
+++ b/backend/plugins/gitlab/impl/impl.go
@@ -19,7 +19,6 @@ package impl
import (
"fmt"
- "time"
"github.com/apache/incubator-devlake/helpers/pluginhelper/subtaskmeta/sorter"
@@ -77,9 +76,8 @@ func (p Gitlab) ScopeConfig() dal.Tabler {
func (p Gitlab) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
- return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes,
syncPolicy)
+ return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes)
}
func (p Gitlab) GetTablesInfo() []dal.Tabler {
@@ -151,14 +149,6 @@ func (p Gitlab) PrepareTaskData(taskCtx
plugin.TaskContext, options map[string]i
return nil, err
}
- var timeAfter time.Time
- if op.TimeAfter != "" {
- timeAfter, err = errors.Convert01(time.Parse(time.RFC3339,
op.TimeAfter))
- if err != nil {
- return nil, errors.BadInput.Wrap(err, "invalid value
for `timeAfter`")
- }
- }
-
if op.ProjectId != 0 {
var scope *models.GitlabProject
// support v100 & advance mode
@@ -212,10 +202,6 @@ func (p Gitlab) PrepareTaskData(taskCtx
plugin.TaskContext, options map[string]i
RegexEnricher: regexEnricher,
}
- if !timeAfter.IsZero() {
- taskData.TimeAfter = &timeAfter
- logger.Debug("collect data updated timeAfter %s", timeAfter)
- }
return &taskData, nil
}
diff --git a/backend/plugins/gitlab/tasks/issue_collector.go
b/backend/plugins/gitlab/tasks/issue_collector.go
index a8b51c5c6..425adc9f9 100644
--- a/backend/plugins/gitlab/tasks/issue_collector.go
+++ b/backend/plugins/gitlab/tasks/issue_collector.go
@@ -46,12 +46,13 @@ var CollectApiIssuesMeta = plugin.SubTaskMeta{
func CollectApiIssues(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_ISSUE_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
incremental := collectorWithState.IsIncremental()
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
@@ -63,8 +64,8 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
*/
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- if collectorWithState.TimeAfter != nil {
- query.Set("updated_after",
collectorWithState.TimeAfter.Format(time.RFC3339))
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ query.Set("updated_after",
syncPolicy.TimeAfter.Format(time.RFC3339))
}
if incremental {
query.Set("updated_after",
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
diff --git a/backend/plugins/gitlab/tasks/job_collector.go
b/backend/plugins/gitlab/tasks/job_collector.go
index 6a7475784..755f8800b 100644
--- a/backend/plugins/gitlab/tasks/job_collector.go
+++ b/backend/plugins/gitlab/tasks/job_collector.go
@@ -55,7 +55,6 @@ func CollectApiJobs(taskCtx plugin.SubTaskContext)
errors.Error {
collector, err :=
helper.NewStatefulApiCollectorForFinalizableEntity(helper.FinalizableApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
- TimeAfter: data.TimeAfter, // set to nil to disable
timeFilter
CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{
PageSize: 100,
Concurrency: 10,
diff --git a/backend/plugins/gitlab/tasks/mr_collector.go
b/backend/plugins/gitlab/tasks/mr_collector.go
index 125262e60..ef991ae01 100644
--- a/backend/plugins/gitlab/tasks/mr_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_collector.go
@@ -43,12 +43,13 @@ var CollectApiMergeRequestsMeta = plugin.SubTaskMeta{
func CollectApiMergeRequests(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_MERGE_REQUEST_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
incremental := collectorWithState.IsIncremental()
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
@@ -61,8 +62,8 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return nil, err
}
- if collectorWithState.TimeAfter != nil {
- query.Set("updated_after",
collectorWithState.TimeAfter.Format(time.RFC3339))
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ query.Set("updated_after",
syncPolicy.TimeAfter.Format(time.RFC3339))
}
if incremental {
query.Set("updated_after",
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
diff --git a/backend/plugins/gitlab/tasks/mr_commit_collector.go
b/backend/plugins/gitlab/tasks/mr_commit_collector.go
index f84a58f1d..2fbbdd48a 100644
--- a/backend/plugins/gitlab/tasks/mr_commit_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_commit_collector.go
@@ -40,7 +40,7 @@ var CollectApiMrCommitsMeta = plugin.SubTaskMeta{
func CollectApiMergeRequestsCommits(taskCtx plugin.SubTaskContext)
errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_MERGE_REQUEST_COMMITS_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
diff --git a/backend/plugins/gitlab/tasks/mr_detail_collector.go
b/backend/plugins/gitlab/tasks/mr_detail_collector.go
index 0b3971d3f..397d7f91f 100644
--- a/backend/plugins/gitlab/tasks/mr_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_detail_collector.go
@@ -44,7 +44,7 @@ var CollectApiMergeRequestDetailsMeta = plugin.SubTaskMeta{
func CollectApiMergeRequestDetails(taskCtx plugin.SubTaskContext) errors.Error
{
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_MERGE_REQUEST_DETAIL_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
@@ -77,6 +77,7 @@ func CollectApiMergeRequestDetails(taskCtx
plugin.SubTaskContext) errors.Error {
func GetMergeRequestDetailsIterator(taskCtx plugin.SubTaskContext,
collectorWithState *helper.ApiCollectorStateManager)
(*helper.DalCursorIterator, errors.Error) {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*GitlabTaskData)
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
clauses := []dal.Clause{
dal.Select("gmr.gitlab_id, gmr.iid"),
dal.From("_tool_gitlab_merge_requests gmr"),
@@ -87,8 +88,8 @@ func GetMergeRequestDetailsIterator(taskCtx
plugin.SubTaskContext, collectorWith
}
if collectorWithState.LatestState.LatestSuccessStart != nil {
clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.LatestState.LatestSuccessStart))
- } else if collectorWithState.TimeAfter != nil {
- clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.TimeAfter))
+ } else if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*syncPolicy.TimeAfter))
}
// construct the input iterator
cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/gitlab/tasks/mr_note_collector.go
b/backend/plugins/gitlab/tasks/mr_note_collector.go
index a4d5f40ff..d6fde5751 100644
--- a/backend/plugins/gitlab/tasks/mr_note_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_note_collector.go
@@ -40,7 +40,7 @@ var CollectApiMrNotesMeta = plugin.SubTaskMeta{
func CollectApiMergeRequestsNotes(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_MERGE_REQUEST_NOTES_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
diff --git a/backend/plugins/gitlab/tasks/pipeline_collector.go
b/backend/plugins/gitlab/tasks/pipeline_collector.go
index 1ef244887..f93d21655 100644
--- a/backend/plugins/gitlab/tasks/pipeline_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_collector.go
@@ -44,7 +44,7 @@ var CollectApiPipelinesMeta = plugin.SubTaskMeta{
func CollectApiPipelines(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_PIPELINE_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
@@ -54,6 +54,7 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
incremental := collectorWithState.IsIncremental()
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
@@ -63,8 +64,8 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext)
errors.Error {
UrlTemplate: "projects/{{ .Params.ProjectId
}}/pipelines",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- if collectorWithState.TimeAfter != nil {
- query.Set("updated_after",
collectorWithState.TimeAfter.Format(time.RFC3339))
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ query.Set("updated_after",
syncPolicy.TimeAfter.Format(time.RFC3339))
}
if incremental {
query.Set("updated_after",
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
diff --git a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
index 856659adb..a37807b8c 100644
--- a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
@@ -45,7 +45,7 @@ var CollectApiPipelineDetailsMeta = plugin.SubTaskMeta{
func CollectApiPipelineDetails(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_PIPELINE_DETAILS_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
diff --git a/backend/plugins/gitlab/tasks/task_data.go
b/backend/plugins/gitlab/tasks/task_data.go
index a49d11389..45e4abe3b 100644
--- a/backend/plugins/gitlab/tasks/task_data.go
+++ b/backend/plugins/gitlab/tasks/task_data.go
@@ -18,8 +18,6 @@ limitations under the License.
package tasks
import (
- "time"
-
"github.com/apache/incubator-devlake/core/errors"
helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/gitlab/models"
@@ -29,7 +27,6 @@ type GitlabOptions struct {
ConnectionId uint64 `mapstructure:"connectionId"
json:"connectionId"`
ProjectId int `mapstructure:"projectId"
json:"projectId"`
ScopeConfigId uint64 `mapstructure:"scopeConfigId"
json:"scopeConfigId"`
- TimeAfter string `mapstructure:"timeAfter"
json:"timeAfter"`
ScopeConfig *models.GitlabScopeConfig `mapstructure:"scopeConfig"
json:"scopeConfig"`
}
@@ -37,7 +34,6 @@ type GitlabTaskData struct {
Options *GitlabOptions
ApiClient *helper.ApiAsyncClient
ProjectCommit *models.GitlabProjectCommit
- TimeAfter *time.Time
RegexEnricher *helper.RegexEnricher
}
diff --git a/backend/plugins/jenkins/api/blueprint_v200.go
b/backend/plugins/jenkins/api/blueprint_v200.go
index 7dca09a03..10940fb62 100644
--- a/backend/plugins/jenkins/api/blueprint_v200.go
+++ b/backend/plugins/jenkins/api/blueprint_v200.go
@@ -18,8 +18,6 @@ limitations under the License.
package api
import (
- "time"
-
"github.com/apache/incubator-devlake/core/errors"
coreModels "github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/models/domainlayer"
@@ -35,10 +33,9 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
plan := make(coreModels.PipelinePlan, len(bpScopes))
- plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId, syncPolicy)
+ plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId)
if err != nil {
return nil, nil, err
}
@@ -55,7 +52,6 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connectionId uint64,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
stage := plan[i]
@@ -68,10 +64,6 @@ func makeDataSourcePipelinePlanV200(
options["scopeId"] = bpScope.ScopeId
options["connectionId"] = connectionId
- if syncPolicy.TimeAfter != nil {
- options["timeAfter"] =
syncPolicy.TimeAfter.Format(time.RFC3339)
- }
-
// get scope config from db
_, scopeConfig, err :=
scopeHelper.DbHelper().GetScopeAndConfig(connectionId, bpScope.ScopeId)
if err != nil {
diff --git a/backend/plugins/jenkins/api/blueprint_v200_test.go
b/backend/plugins/jenkins/api/blueprint_v200_test.go
index bfd61f573..561cc6c56 100644
--- a/backend/plugins/jenkins/api/blueprint_v200_test.go
+++ b/backend/plugins/jenkins/api/blueprint_v200_test.go
@@ -42,14 +42,14 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
bs := &coreModels.BlueprintScope{
ScopeId: "a/b/ccc",
}
- syncPolicy := &coreModels.SyncPolicy{}
+
bpScopes := make([]*coreModels.BlueprintScope, 0)
bpScopes = append(bpScopes, bs)
mockBasicRes(t)
plan := make(coreModels.PipelinePlan, len(bpScopes))
- plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes, 1,
syncPolicy)
+ plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes, 1)
assert.Nil(t, err)
scopes, err := makeScopesV200(bpScopes, 1)
assert.Nil(t, err)
diff --git a/backend/plugins/jenkins/impl/impl.go
b/backend/plugins/jenkins/impl/impl.go
index 01970c5dc..d0807fd64 100644
--- a/backend/plugins/jenkins/impl/impl.go
+++ b/backend/plugins/jenkins/impl/impl.go
@@ -20,7 +20,6 @@ package impl
import (
"fmt"
"strings"
- "time"
coreModels "github.com/apache/incubator-devlake/core/models"
@@ -135,13 +134,6 @@ func (p Jenkins) PrepareTaskData(taskCtx
plugin.TaskContext, options map[string]
return nil, err
}
- var timeAfter time.Time
- if op.TimeAfter != "" {
- timeAfter, err = errors.Convert01(time.Parse(time.RFC3339,
op.TimeAfter))
- if err != nil {
- return nil, errors.BadInput.Wrap(err, "invalid value
for `timeAfter`")
- }
- }
regexEnricher := helper.NewRegexEnricher()
if err := regexEnricher.TryAdd(devops.DEPLOYMENT,
op.ScopeConfig.DeploymentPattern); err != nil {
return nil, errors.BadInput.Wrap(err, "invalid value for
`deploymentPattern`")
@@ -155,10 +147,7 @@ func (p Jenkins) PrepareTaskData(taskCtx
plugin.TaskContext, options map[string]
Connection: connection,
RegexEnricher: regexEnricher,
}
- if !timeAfter.IsZero() {
- taskData.TimeAfter = &timeAfter
- logger.Debug("collect data created from %s", timeAfter)
- }
+
return taskData, nil
}
@@ -173,9 +162,8 @@ func (p Jenkins) MigrationScripts()
[]plugin.MigrationScript {
func (p Jenkins) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes)
}
func (p Jenkins) ApiResources()
map[string]map[string]plugin.ApiResourceHandler {
diff --git a/backend/plugins/jenkins/jenkins.go
b/backend/plugins/jenkins/jenkins.go
index 140e1167a..7b0c59523 100644
--- a/backend/plugins/jenkins/jenkins.go
+++ b/backend/plugins/jenkins/jenkins.go
@@ -30,14 +30,12 @@ func main() {
connectionId := jenkinsCmd.Flags().Uint64P("connection", "c", 1,
"jenkins connection id")
jobFullName := jenkinsCmd.Flags().StringP("jobFullName", "j", "",
"jenkins job full name")
deployTagPattern := jenkinsCmd.Flags().String("deployTagPattern",
"(?i)deploy", "deploy tag name")
- timeAfter := jenkinsCmd.Flags().StringP("timeAfter", "a", "", "collect
data that are created after specified time, ie 2006-05-06T07:08:09Z")
jenkinsCmd.Run = func(cmd *cobra.Command, args []string) {
runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
"connectionId": *connectionId,
"jobFullName": *jobFullName,
"deployTagPattern": *deployTagPattern,
- "timeAfter": *timeAfter,
})
}
runner.RunCmd(jenkinsCmd)
diff --git a/backend/plugins/jenkins/tasks/build_collector.go
b/backend/plugins/jenkins/tasks/build_collector.go
index 290d3a3e6..56924f350 100644
--- a/backend/plugins/jenkins/tasks/build_collector.go
+++ b/backend/plugins/jenkins/tasks/build_collector.go
@@ -66,7 +66,6 @@ func CollectApiBuilds(taskCtx plugin.SubTaskContext)
errors.Error {
Table: RAW_BUILD_TABLE,
},
ApiClient: data.ApiClient,
- TimeAfter: data.TimeAfter,
CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{
PageSize: 100,
Concurrency: 10,
diff --git a/backend/plugins/jenkins/tasks/stage_collector.go
b/backend/plugins/jenkins/tasks/stage_collector.go
index e6c4d1b30..6c8478750 100644
--- a/backend/plugins/jenkins/tasks/stage_collector.go
+++ b/backend/plugins/jenkins/tasks/stage_collector.go
@@ -54,9 +54,9 @@ func CollectApiStages(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where(`tjb.connection_id = ? and tjb.job_path = ? and
tjb.job_name = ? and tjb.class = ?`,
data.Options.ConnectionId, data.Options.JobPath,
data.Options.JobName, "WorkflowRun"),
}
- timeAfter := data.TimeAfter
- if timeAfter != nil {
- clauses = append(clauses, dal.Where(`tjb.start_time >= ?`,
timeAfter))
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ clauses = append(clauses, dal.Where(`tjb.start_time >= ?`,
syncPolicy.TimeAfter))
}
cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/jenkins/tasks/task_data.go
b/backend/plugins/jenkins/tasks/task_data.go
index 3be060df7..d34b4e6ac 100644
--- a/backend/plugins/jenkins/tasks/task_data.go
+++ b/backend/plugins/jenkins/tasks/task_data.go
@@ -19,7 +19,6 @@ package tasks
import (
"strings"
- "time"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
@@ -30,11 +29,10 @@ type JenkinsApiParams models.JenkinsApiParams
type JenkinsOptions struct {
ConnectionId uint64 `json:"connectionId"`
ScopeId string
- ScopeConfigId uint64 `json:"scopeConfigId"`
- JobFullName string `json:"jobFullName"` // "path1/path2/job name"
- JobName string `json:"jobName"` // "job name"
- JobPath string `json:"jobPath"` // "job/path1/job/path2"
- TimeAfter string
+ ScopeConfigId uint64 `json:"scopeConfigId"`
+ JobFullName string `json:"jobFullName"` //
"path1/path2/job name"
+ JobName string `json:"jobName"` // "job
name"
+ JobPath string `json:"jobPath"` //
"job/path1/job/path2"
Tasks []string `json:"tasks,omitempty"`
ScopeConfig *models.JenkinsScopeConfig `mapstructure:"scopeConfig"
json:"scopeConfig"`
}
@@ -43,7 +41,6 @@ type JenkinsTaskData struct {
Options *JenkinsOptions
ApiClient *api.ApiAsyncClient
Connection *models.JenkinsConnection
- TimeAfter *time.Time
RegexEnricher *api.RegexEnricher
}
diff --git a/backend/plugins/jira/api/blueprint_v200.go
b/backend/plugins/jira/api/blueprint_v200.go
index 20be17970..d67b86b98 100644
--- a/backend/plugins/jira/api/blueprint_v200.go
+++ b/backend/plugins/jira/api/blueprint_v200.go
@@ -18,8 +18,6 @@ limitations under the License.
package api
import (
- "time"
-
"github.com/apache/incubator-devlake/core/errors"
coreModels "github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/models/domainlayer"
@@ -35,10 +33,9 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
plan := make(coreModels.PipelinePlan, len(bpScopes))
- plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId, syncPolicy)
+ plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId)
if err != nil {
return nil, nil, err
}
@@ -55,7 +52,6 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connectionId uint64,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
stage := plan[i]
@@ -66,9 +62,6 @@ func makeDataSourcePipelinePlanV200(
options := make(map[string]interface{})
options["scopeId"] = bpScope.ScopeId
options["connectionId"] = connectionId
- if syncPolicy.TimeAfter != nil {
- options["timeAfter"] =
syncPolicy.TimeAfter.Format(time.RFC3339)
- }
// get scope config from db
_, scopeConfig, err :=
scopeHelper.DbHelper().GetScopeAndConfig(connectionId, bpScope.ScopeId)
diff --git a/backend/plugins/jira/api/blueprint_v200_test.go
b/backend/plugins/jira/api/blueprint_v200_test.go
index 76b9306f6..81c44186b 100644
--- a/backend/plugins/jira/api/blueprint_v200_test.go
+++ b/backend/plugins/jira/api/blueprint_v200_test.go
@@ -42,13 +42,13 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
bs := &coreModels.BlueprintScope{
ScopeId: "10",
}
- syncPolicy := &coreModels.SyncPolicy{}
+
bpScopes := make([]*coreModels.BlueprintScope, 0)
bpScopes = append(bpScopes, bs)
plan := make(coreModels.PipelinePlan, len(bpScopes))
mockBasicRes(t)
- plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes,
uint64(1), syncPolicy)
+ plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes,
uint64(1))
assert.Nil(t, err)
scopes, err := makeScopesV200(bpScopes, uint64(1))
assert.Nil(t, err)
diff --git a/backend/plugins/jira/impl/impl.go
b/backend/plugins/jira/impl/impl.go
index 432a4142a..20e6a052f 100644
--- a/backend/plugins/jira/impl/impl.go
+++ b/backend/plugins/jira/impl/impl.go
@@ -20,7 +20,6 @@ package impl
import (
"fmt"
"net/http"
- "time"
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/dal"
@@ -250,24 +249,15 @@ func (p Jira) PrepareTaskData(taskCtx plugin.TaskContext,
options map[string]int
ApiClient: jiraApiClient,
JiraServerInfo: *info,
}
- if op.TimeAfter != "" {
- var timeAfter time.Time
- timeAfter, err = errors.Convert01(time.Parse(time.RFC3339,
op.TimeAfter))
- if err != nil {
- return nil, errors.BadInput.Wrap(err, "invalid value
for `timeAfter`")
- }
- taskData.TimeAfter = &timeAfter
- logger.Debug("collect data created from %s", timeAfter)
- }
+
return taskData, nil
}
func (p Jira) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes)
}
func (p Jira) RootPkgPath() string {
diff --git a/backend/plugins/jira/jira.go b/backend/plugins/jira/jira.go
index 208b4c82e..fdb2dfa61 100644
--- a/backend/plugins/jira/jira.go
+++ b/backend/plugins/jira/jira.go
@@ -32,12 +32,11 @@ func main() {
boardId := cmd.Flags().Uint64P("board", "b", 0, "jira board id")
_ = cmd.MarkFlagRequired("connection")
_ = cmd.MarkFlagRequired("board")
- timeAfter := cmd.Flags().StringP("timeAfter", "a", "", "collect data
that are created after specified time, ie 2006-05-06T07:08:09Z")
+
cmd.Run = func(c *cobra.Command, args []string) {
runner.DirectRun(c, args, PluginEntry, map[string]interface{}{
"connectionId": *connectionId,
"boardId": *boardId,
- "timeAfter": *timeAfter,
})
}
runner.RunCmd(cmd)
diff --git a/backend/plugins/jira/tasks/development_panel_collector.go
b/backend/plugins/jira/tasks/development_panel_collector.go
index 44d65f627..278052cdc 100644
--- a/backend/plugins/jira/tasks/development_panel_collector.go
+++ b/backend/plugins/jira/tasks/development_panel_collector.go
@@ -61,7 +61,7 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext)
errors.Error {
},
Table: RAW_DEVELOPMENT_PANEL,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
diff --git a/backend/plugins/jira/tasks/issue_changelog_collector.go
b/backend/plugins/jira/tasks/issue_changelog_collector.go
index 99f95ffa6..7b5fc7674 100644
--- a/backend/plugins/jira/tasks/issue_changelog_collector.go
+++ b/backend/plugins/jira/tasks/issue_changelog_collector.go
@@ -60,7 +60,7 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
BoardId: data.Options.BoardId,
},
Table: RAW_CHANGELOG_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
diff --git a/backend/plugins/jira/tasks/issue_collector.go
b/backend/plugins/jira/tasks/issue_collector.go
index 19f9259c9..a6cf3b8ea 100644
--- a/backend/plugins/jira/tasks/issue_collector.go
+++ b/backend/plugins/jira/tasks/issue_collector.go
@@ -20,13 +20,14 @@ package tasks
import (
"encoding/json"
"fmt"
- "github.com/apache/incubator-devlake/core/dal"
- "github.com/apache/incubator-devlake/plugins/jira/models"
"io"
"net/http"
"net/url"
"time"
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/plugins/jira/models"
+
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
@@ -61,7 +62,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext)
errors.Error {
Table store raw data
*/
Table: RAW_ISSUE_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
@@ -76,7 +77,14 @@ func CollectIssues(taskCtx plugin.SubTaskContext)
errors.Error {
} else {
logger.Info("got user's timezone: %v", loc.String())
}
- jql := buildJQL(data.TimeAfter,
collectorWithState.LatestState.LatestSuccessStart, incremental, loc)
+
+ jql := ""
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ jql = buildJQL(syncPolicy.TimeAfter,
collectorWithState.LatestState.LatestSuccessStart, incremental, loc)
+ } else {
+ jql = buildJQL(nil,
collectorWithState.LatestState.LatestSuccessStart, incremental, loc)
+ }
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
diff --git a/backend/plugins/jira/tasks/issue_comment_collector.go
b/backend/plugins/jira/tasks/issue_comment_collector.go
index c7879d4e2..fb0d2168c 100644
--- a/backend/plugins/jira/tasks/issue_comment_collector.go
+++ b/backend/plugins/jira/tasks/issue_comment_collector.go
@@ -60,7 +60,7 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext)
errors.Error {
BoardId: data.Options.BoardId,
},
Table: RAW_ISSUE_COMMENT_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
diff --git a/backend/plugins/jira/tasks/remotelink_collector.go
b/backend/plugins/jira/tasks/remotelink_collector.go
index e3b141053..158640d5c 100644
--- a/backend/plugins/jira/tasks/remotelink_collector.go
+++ b/backend/plugins/jira/tasks/remotelink_collector.go
@@ -59,7 +59,7 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext)
errors.Error {
BoardId: data.Options.BoardId,
},
Table: RAW_REMOTELINK_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
diff --git a/backend/plugins/jira/tasks/task_data.go
b/backend/plugins/jira/tasks/task_data.go
index 13c8f00c2..3a9b01d26 100644
--- a/backend/plugins/jira/tasks/task_data.go
+++ b/backend/plugins/jira/tasks/task_data.go
@@ -19,7 +19,6 @@ package tasks
import (
"fmt"
- "time"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
@@ -27,9 +26,8 @@ import (
)
type JiraOptions struct {
- ConnectionId uint64 `json:"connectionId"`
- BoardId uint64 `json:"boardId"`
- TimeAfter string
+ ConnectionId uint64 `json:"connectionId"`
+ BoardId uint64 `json:"boardId"`
ScopeConfig *models.JiraScopeConfig `json:"scopeConfig"`
ScopeId string
ScopeConfigId uint64
@@ -39,7 +37,6 @@ type JiraOptions struct {
type JiraTaskData struct {
Options *JiraOptions
ApiClient *api.ApiAsyncClient
- TimeAfter *time.Time
JiraServerInfo models.JiraServerInfo
}
diff --git a/backend/plugins/jira/tasks/worklog_collector.go
b/backend/plugins/jira/tasks/worklog_collector.go
index bd536eb47..d0feab119 100644
--- a/backend/plugins/jira/tasks/worklog_collector.go
+++ b/backend/plugins/jira/tasks/worklog_collector.go
@@ -52,7 +52,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
BoardId: data.Options.BoardId,
},
Table: RAW_WORKLOGS_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
diff --git a/backend/plugins/pagerduty/api/blueprint_v200.go
b/backend/plugins/pagerduty/api/blueprint_v200.go
index c213e4755..a1c8042fe 100644
--- a/backend/plugins/pagerduty/api/blueprint_v200.go
+++ b/backend/plugins/pagerduty/api/blueprint_v200.go
@@ -18,8 +18,6 @@ limitations under the License.
package api
import (
- "time"
-
"github.com/apache/incubator-devlake/core/errors"
coreModels "github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/models/domainlayer"
@@ -36,7 +34,6 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
// get the connection info for url
connection := &models.PagerDutyConnection{}
@@ -46,7 +43,7 @@ func MakeDataSourcePipelinePlanV200(
}
plan := make(coreModels.PipelinePlan, len(bpScopes))
- plan, err = makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connection, syncPolicy)
+ plan, err = makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connection)
if err != nil {
return nil, nil, err
}
@@ -63,7 +60,6 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connection *models.PagerDutyConnection,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
// get board and scope config from db
@@ -77,9 +73,7 @@ func makeDataSourcePipelinePlanV200(
ServiceId: service.Id,
ServiceName: service.Name,
}
- if syncPolicy.TimeAfter != nil {
- op.TimeAfter = syncPolicy.TimeAfter.Format(time.RFC3339)
- }
+
var options map[string]any
options, err = tasks.EncodeTaskOptions(op)
if err != nil {
diff --git a/backend/plugins/pagerduty/impl/impl.go
b/backend/plugins/pagerduty/impl/impl.go
index 14737971d..273e578b3 100644
--- a/backend/plugins/pagerduty/impl/impl.go
+++ b/backend/plugins/pagerduty/impl/impl.go
@@ -19,7 +19,6 @@ package impl
import (
"fmt"
- "time"
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/dal"
@@ -108,14 +107,7 @@ func (p PagerDuty) PrepareTaskData(taskCtx
plugin.TaskContext, options map[strin
if err != nil {
return nil, errors.Default.Wrap(err, "unable to get Pagerduty
connection by the given connection ID")
}
- var timeAfter *time.Time
- if op.TimeAfter != "" {
- convertedTime, err := errors.Convert01(time.Parse(time.RFC3339,
op.TimeAfter))
- if err != nil {
- return nil, errors.BadInput.Wrap(err,
fmt.Sprintf("invalid value for `timeAfter`: %s", timeAfter))
- }
- timeAfter = &convertedTime
- }
+
client, err := helper.NewApiClient(taskCtx.GetContext(),
connection.Endpoint, map[string]string{
"Authorization": fmt.Sprintf("Token %s", connection.Token),
}, 0, connection.Proxy, taskCtx)
@@ -127,9 +119,8 @@ func (p PagerDuty) PrepareTaskData(taskCtx
plugin.TaskContext, options map[strin
return nil, err
}
return &tasks.PagerDutyTaskData{
- Options: op,
- TimeAfter: timeAfter,
- Client: asyncClient,
+ Options: op,
+ Client: asyncClient,
}, nil
}
@@ -177,9 +168,8 @@ func (p PagerDuty) ApiResources()
map[string]map[string]plugin.ApiResourceHandle
func (p PagerDuty) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes)
}
func (p PagerDuty) Close(taskCtx plugin.TaskContext) errors.Error {
diff --git a/backend/plugins/pagerduty/tasks/incidents_collector.go
b/backend/plugins/pagerduty/tasks/incidents_collector.go
index d8cae2f1f..91068bf5b 100644
--- a/backend/plugins/pagerduty/tasks/incidents_collector.go
+++ b/backend/plugins/pagerduty/tasks/incidents_collector.go
@@ -69,7 +69,6 @@ func CollectIncidents(taskCtx plugin.SubTaskContext)
errors.Error {
collector, err :=
api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
RawDataSubTaskArgs: args,
ApiClient: data.Client,
- TimeAfter: data.TimeAfter,
CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
PageSize: 100,
GetNextPageCustomData: func(prevReqData
*api.RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) {
diff --git a/backend/plugins/pagerduty/tasks/task_data.go
b/backend/plugins/pagerduty/tasks/task_data.go
index 894628073..5f456f725 100644
--- a/backend/plugins/pagerduty/tasks/task_data.go
+++ b/backend/plugins/pagerduty/tasks/task_data.go
@@ -18,8 +18,6 @@ limitations under the License.
package tasks
import (
- "time"
-
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/pagerduty/models"
@@ -27,7 +25,6 @@ import (
type PagerDutyOptions struct {
ConnectionId uint64 `json:"connectionId"`
- TimeAfter string `json:"time_after,omitempty"`
ServiceId string `json:"service_id,omitempty"`
ServiceName string `json:"service_name,omitempty"`
Tasks []string `json:"tasks,omitempty"`
@@ -35,9 +32,8 @@ type PagerDutyOptions struct {
}
type PagerDutyTaskData struct {
- Options *PagerDutyOptions
- TimeAfter *time.Time
- Client api.RateLimitedApiClient
+ Options *PagerDutyOptions
+ Client api.RateLimitedApiClient
}
func (p *PagerDutyOptions) GetParams() any {
diff --git a/backend/plugins/sonarqube/api/blueprint_v200.go
b/backend/plugins/sonarqube/api/blueprint_v200.go
index 95f886c20..b1148e530 100644
--- a/backend/plugins/sonarqube/api/blueprint_v200.go
+++ b/backend/plugins/sonarqube/api/blueprint_v200.go
@@ -38,10 +38,9 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
plan := make(coreModels.PipelinePlan, len(bpScopes))
- plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId, syncPolicy)
+ plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId)
if err != nil {
return nil, nil, err
}
@@ -58,7 +57,6 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connectionId uint64,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
stage := plan[i]
diff --git a/backend/plugins/sonarqube/api/blueprint_v200_test.go
b/backend/plugins/sonarqube/api/blueprint_v200_test.go
index fd1937119..093355e8a 100644
--- a/backend/plugins/sonarqube/api/blueprint_v200_test.go
+++ b/backend/plugins/sonarqube/api/blueprint_v200_test.go
@@ -41,11 +41,11 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
bs := &coreModels.BlueprintScope{
ScopeId: "f5a50c63-2e8f-4107-9014-853f6f467757",
}
- syncPolicy := &coreModels.SyncPolicy{}
+
bpScopes := make([]*coreModels.BlueprintScope, 0)
bpScopes = append(bpScopes, bs)
plan := make(coreModels.PipelinePlan, len(bpScopes))
- plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes,
uint64(1), syncPolicy)
+ plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes,
uint64(1))
assert.Nil(t, err)
basicRes = NewMockBasicRes()
diff --git a/backend/plugins/sonarqube/impl/impl.go
b/backend/plugins/sonarqube/impl/impl.go
index ac2743104..28496b180 100644
--- a/backend/plugins/sonarqube/impl/impl.go
+++ b/backend/plugins/sonarqube/impl/impl.go
@@ -200,9 +200,8 @@ func (p Sonarqube) ApiResources()
map[string]map[string]plugin.ApiResourceHandle
func (p Sonarqube) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes)
}
func (p Sonarqube) Close(taskCtx plugin.TaskContext) errors.Error {
diff --git a/backend/plugins/tapd/api/blueprint_v200.go
b/backend/plugins/tapd/api/blueprint_v200.go
index 41aa2cfa4..8ab9cb0cb 100644
--- a/backend/plugins/tapd/api/blueprint_v200.go
+++ b/backend/plugins/tapd/api/blueprint_v200.go
@@ -19,7 +19,6 @@ package api
import (
"strconv"
- "time"
"github.com/apache/incubator-devlake/core/errors"
coreModels "github.com/apache/incubator-devlake/core/models"
@@ -36,10 +35,9 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
plan := make(coreModels.PipelinePlan, len(bpScopes))
- plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId, syncPolicy)
+ plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId)
if err != nil {
return nil, nil, err
}
@@ -56,7 +54,6 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connectionId uint64,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
stage := plan[i]
@@ -71,9 +68,6 @@ func makeDataSourcePipelinePlanV200(
}
options["workspaceId"] = intNum
options["connectionId"] = connectionId
- if syncPolicy.TimeAfter != nil {
- options["timeAfter"] =
syncPolicy.TimeAfter.Format(time.RFC3339)
- }
_, scopeConfig, err :=
scopeHelper.DbHelper().GetScopeAndConfig(connectionId, bpScope.ScopeId)
if err != nil {
diff --git a/backend/plugins/tapd/api/blueprint_v200_test.go
b/backend/plugins/tapd/api/blueprint_v200_test.go
index dac1d9a43..bb3a17eec 100644
--- a/backend/plugins/tapd/api/blueprint_v200_test.go
+++ b/backend/plugins/tapd/api/blueprint_v200_test.go
@@ -42,13 +42,12 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
bs := &coreModels.BlueprintScope{
ScopeId: "10",
}
- syncPolicy := &coreModels.SyncPolicy{}
bpScopes := make([]*coreModels.BlueprintScope, 0)
bpScopes = append(bpScopes, bs)
plan := make(coreModels.PipelinePlan, len(bpScopes))
mockBasicRes(t)
- plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes,
uint64(1), syncPolicy)
+ plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes,
uint64(1))
assert.Nil(t, err)
scopes, err := makeScopesV200(bpScopes, uint64(1))
assert.Nil(t, err)
diff --git a/backend/plugins/tapd/impl/impl.go
b/backend/plugins/tapd/impl/impl.go
index f72a81a4c..04ec1d62d 100644
--- a/backend/plugins/tapd/impl/impl.go
+++ b/backend/plugins/tapd/impl/impl.go
@@ -255,24 +255,14 @@ func (p Tapd) PrepareTaskData(taskCtx plugin.TaskContext,
options map[string]int
ApiClient: tapdApiClient,
Connection: connection,
}
- if op.TimeAfter != "" {
- var timeAfter time.Time
- timeAfter, err = errors.Convert01(time.Parse(time.RFC3339,
op.TimeAfter))
- if err != nil {
- return nil, errors.BadInput.Wrap(err, "invalid value
for `timeAfter`")
- }
- taskData.TimeAfter = &timeAfter
- logger.Debug("collect data updated timeAfter %s", timeAfter)
- }
return taskData, nil
}
func (p Tapd) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes)
}
func (p Tapd) RootPkgPath() string {
diff --git a/backend/plugins/tapd/tasks/bug_changelog_collector.go
b/backend/plugins/tapd/tasks/bug_changelog_collector.go
index 32992091c..b967cfa78 100644
--- a/backend/plugins/tapd/tasks/bug_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/bug_changelog_collector.go
@@ -34,11 +34,12 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_BUG_CHANGELOG_TABLE)
logger := taskCtx.GetLogger()
logger.Info("collect storyChangelogs")
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
incremental := collectorWithState.IsIncremental()
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
Incremental: incremental,
@@ -51,10 +52,10 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created desc")
- if data.TimeAfter != nil {
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
query.Set("created",
fmt.Sprintf(">%s",
-
data.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
}
if incremental {
query.Set("created",
diff --git a/backend/plugins/tapd/tasks/bug_collector.go
b/backend/plugins/tapd/tasks/bug_collector.go
index 530c5ff04..3265d320f 100644
--- a/backend/plugins/tapd/tasks/bug_collector.go
+++ b/backend/plugins/tapd/tasks/bug_collector.go
@@ -19,10 +19,11 @@ package tasks
import (
"fmt"
+ "net/url"
+
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
- "net/url"
)
const RAW_BUG_TABLE = "tapd_api_bugs"
@@ -33,12 +34,12 @@ func CollectBugs(taskCtx plugin.SubTaskContext)
errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_BUG_TABLE)
logger := taskCtx.GetLogger()
logger.Info("collect bugs")
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
incremental := collectorWithState.IsIncremental()
-
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
Incremental: incremental,
ApiClient: data.ApiClient,
@@ -51,10 +52,10 @@ func CollectBugs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if data.TimeAfter != nil {
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
query.Set("modified",
fmt.Sprintf(">%s",
-
data.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
}
if incremental {
query.Set("modified",
diff --git a/backend/plugins/tapd/tasks/bug_commit_collector.go
b/backend/plugins/tapd/tasks/bug_commit_collector.go
index 26a8e1e11..3209b6bf2 100644
--- a/backend/plugins/tapd/tasks/bug_commit_collector.go
+++ b/backend/plugins/tapd/tasks/bug_commit_collector.go
@@ -20,14 +20,15 @@ package tasks
import (
"encoding/json"
"fmt"
+ "net/http"
+ "net/url"
+ "reflect"
+
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/tapd/models"
- "net/http"
- "net/url"
- "reflect"
)
const RAW_BUG_COMMIT_TABLE = "tapd_api_bug_commits"
@@ -37,20 +38,21 @@ var _ plugin.SubTaskEntryPoint = CollectBugCommits
func CollectBugCommits(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_BUG_COMMIT_TABLE)
db := taskCtx.GetDal()
- collectorWithState, err :=
api.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
logger := taskCtx.GetLogger()
logger.Info("collect issueCommits")
incremental := collectorWithState.IsIncremental()
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
clauses := []dal.Clause{
dal.Select("_tool_tapd_bugs.id as issue_id, modified as
update_time"),
dal.From(&models.TapdBug{}),
dal.Where("_tool_tapd_bugs.connection_id = ? and
_tool_tapd_bugs.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if collectorWithState.TimeAfter != nil {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.TimeAfter))
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ clauses = append(clauses, dal.Where("modified > ?",
*syncPolicy.TimeAfter))
}
if incremental {
clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.LatestState.LatestSuccessStart))
diff --git a/backend/plugins/tapd/tasks/iteration_collector.go
b/backend/plugins/tapd/tasks/iteration_collector.go
index 7e18030f9..fe21fbc4f 100644
--- a/backend/plugins/tapd/tasks/iteration_collector.go
+++ b/backend/plugins/tapd/tasks/iteration_collector.go
@@ -36,11 +36,12 @@ func CollectIterations(taskCtx plugin.SubTaskContext)
errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_ITERATION_TABLE)
logger := taskCtx.GetLogger()
logger.Info("collect iterations")
- collectorWithState, err :=
api.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
incremental := collectorWithState.IsIncremental()
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
Incremental: incremental,
ApiClient: data.ApiClient,
@@ -53,10 +54,10 @@ func CollectIterations(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if data.TimeAfter != nil {
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
query.Set("modified",
fmt.Sprintf(">%s",
-
data.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
}
if incremental {
query.Set("modified",
diff --git a/backend/plugins/tapd/tasks/story_bug_collector.go
b/backend/plugins/tapd/tasks/story_bug_collector.go
index 0115399f5..cfb27fa69 100644
--- a/backend/plugins/tapd/tasks/story_bug_collector.go
+++ b/backend/plugins/tapd/tasks/story_bug_collector.go
@@ -19,13 +19,14 @@ package tasks
import (
"fmt"
+ "net/url"
+ "reflect"
+
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/tapd/models"
- "net/url"
- "reflect"
)
const RAW_STORY_BUG_TABLE = "tapd_api_story_bugs"
@@ -35,20 +36,21 @@ var _ plugin.SubTaskEntryPoint = CollectStoryBugs
func CollectStoryBugs(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_STORY_BUG_TABLE)
db := taskCtx.GetDal()
- collectorWithState, err :=
api.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
logger := taskCtx.GetLogger()
logger.Info("collect storyBugs")
incremental := collectorWithState.IsIncremental()
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
clauses := []dal.Clause{
dal.Select("id as issue_id, modified as update_time"),
dal.From(&models.TapdStory{}),
dal.Where("_tool_tapd_stories.connection_id = ? and
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if collectorWithState.TimeAfter != nil {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.TimeAfter))
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ clauses = append(clauses, dal.Where("modified > ?",
*syncPolicy.TimeAfter))
}
if incremental {
clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.LatestState.LatestSuccessStart))
diff --git a/backend/plugins/tapd/tasks/story_changelog_collector.go
b/backend/plugins/tapd/tasks/story_changelog_collector.go
index c7ef61407..a0bad43bd 100644
--- a/backend/plugins/tapd/tasks/story_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/story_changelog_collector.go
@@ -32,13 +32,14 @@ var _ plugin.SubTaskEntryPoint = CollectStoryChangelogs
func CollectStoryChangelogs(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_STORY_CHANGELOG_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
logger := taskCtx.GetLogger()
logger.Info("collect storyChangelogs")
incremental := collectorWithState.IsIncremental()
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
Incremental: incremental,
@@ -51,10 +52,10 @@ func CollectStoryChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if data.TimeAfter != nil {
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
query.Set("created",
fmt.Sprintf(">%s",
-
data.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
}
if incremental {
query.Set("created",
diff --git a/backend/plugins/tapd/tasks/story_collector.go
b/backend/plugins/tapd/tasks/story_collector.go
index e6df82388..e88dd87ac 100644
--- a/backend/plugins/tapd/tasks/story_collector.go
+++ b/backend/plugins/tapd/tasks/story_collector.go
@@ -34,11 +34,12 @@ func CollectStorys(taskCtx plugin.SubTaskContext)
errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_STORY_TABLE)
logger := taskCtx.GetLogger()
logger.Info("collect stories")
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
incremental := collectorWithState.IsIncremental()
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
Incremental: incremental,
ApiClient: data.ApiClient,
@@ -51,10 +52,10 @@ func CollectStorys(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if data.TimeAfter != nil {
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
query.Set("modified",
fmt.Sprintf(">%s",
-
data.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
}
if incremental {
query.Set("modified",
diff --git a/backend/plugins/tapd/tasks/story_commit_collector.go
b/backend/plugins/tapd/tasks/story_commit_collector.go
index 1b0f63505..8e2ad026f 100644
--- a/backend/plugins/tapd/tasks/story_commit_collector.go
+++ b/backend/plugins/tapd/tasks/story_commit_collector.go
@@ -20,14 +20,15 @@ package tasks
import (
"encoding/json"
"fmt"
+ "net/http"
+ "net/url"
+ "reflect"
+
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/tapd/models"
- "net/http"
- "net/url"
- "reflect"
)
const RAW_STORY_COMMIT_TABLE = "tapd_api_story_commits"
@@ -37,20 +38,21 @@ var _ plugin.SubTaskEntryPoint = CollectStoryCommits
func CollectStoryCommits(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_STORY_COMMIT_TABLE)
db := taskCtx.GetDal()
- collectorWithState, err :=
api.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
logger := taskCtx.GetLogger()
logger.Info("collect issueCommits")
incremental := collectorWithState.IsIncremental()
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
clauses := []dal.Clause{
dal.Select("_tool_tapd_stories.id as issue_id, modified as
update_time"),
dal.From(&models.TapdStory{}),
dal.Where("_tool_tapd_stories.connection_id = ? and
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if collectorWithState.TimeAfter != nil {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.TimeAfter))
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ clauses = append(clauses, dal.Where("modified > ?",
*syncPolicy.TimeAfter))
}
if incremental {
clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.LatestState.LatestSuccessStart))
diff --git a/backend/plugins/tapd/tasks/task_changelog_collector.go
b/backend/plugins/tapd/tasks/task_changelog_collector.go
index 7f52ea28c..9a457c7bd 100644
--- a/backend/plugins/tapd/tasks/task_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/task_changelog_collector.go
@@ -32,14 +32,14 @@ var _ plugin.SubTaskEntryPoint = CollectTaskChangelogs
func CollectTaskChangelogs(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_TASK_CHANGELOG_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
logger := taskCtx.GetLogger()
logger.Info("collect taskChangelogs")
incremental := collectorWithState.IsIncremental()
-
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
Incremental: incremental,
ApiClient: data.ApiClient,
@@ -51,10 +51,10 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if data.TimeAfter != nil {
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
query.Set("created",
fmt.Sprintf(">%s",
-
data.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
}
if incremental {
query.Set("created",
diff --git a/backend/plugins/tapd/tasks/task_collector.go
b/backend/plugins/tapd/tasks/task_collector.go
index f0da6d10b..36d505e5c 100644
--- a/backend/plugins/tapd/tasks/task_collector.go
+++ b/backend/plugins/tapd/tasks/task_collector.go
@@ -34,12 +34,12 @@ func CollectTasks(taskCtx plugin.SubTaskContext)
errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_TASK_TABLE)
logger := taskCtx.GetLogger()
logger.Info("collect tasks")
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
incremental := collectorWithState.IsIncremental()
-
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
Incremental: incremental,
@@ -53,10 +53,10 @@ func CollectTasks(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if data.TimeAfter != nil {
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
query.Set("modified",
fmt.Sprintf(">%s",
-
data.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
}
if incremental {
query.Set("modified",
diff --git a/backend/plugins/tapd/tasks/task_commit_collector.go
b/backend/plugins/tapd/tasks/task_commit_collector.go
index 1a73a870c..8eae8e001 100644
--- a/backend/plugins/tapd/tasks/task_commit_collector.go
+++ b/backend/plugins/tapd/tasks/task_commit_collector.go
@@ -20,14 +20,15 @@ package tasks
import (
"encoding/json"
"fmt"
+ "net/http"
+ "net/url"
+ "reflect"
+
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/tapd/models"
- "net/http"
- "net/url"
- "reflect"
)
const RAW_TASK_COMMIT_TABLE = "tapd_api_task_commits"
@@ -37,20 +38,21 @@ var _ plugin.SubTaskEntryPoint = CollectTaskCommits
func CollectTaskCommits(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_TASK_COMMIT_TABLE)
db := taskCtx.GetDal()
- collectorWithState, err :=
api.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
logger := taskCtx.GetLogger()
logger.Info("collect issueCommits")
incremental := collectorWithState.IsIncremental()
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
clauses := []dal.Clause{
dal.Select("_tool_tapd_tasks.id as issue_id, modified as
update_time"),
dal.From(&models.TapdTask{}),
dal.Where("_tool_tapd_tasks.connection_id = ? and
_tool_tapd_tasks.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if collectorWithState.TimeAfter != nil {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.TimeAfter))
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ clauses = append(clauses, dal.Where("modified > ?",
*syncPolicy.TimeAfter))
}
if incremental {
clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.LatestState.LatestSuccessStart))
diff --git a/backend/plugins/tapd/tasks/task_data.go
b/backend/plugins/tapd/tasks/task_data.go
index fc892f0fe..16b077bbb 100644
--- a/backend/plugins/tapd/tasks/task_data.go
+++ b/backend/plugins/tapd/tasks/task_data.go
@@ -29,7 +29,6 @@ type TapdOptions struct {
ConnectionId uint64 `mapstruct:"connectionId"`
WorkspaceId uint64 `mapstruct:"workspaceId"`
PageSize uint64 `mapstruct:"pageSize"`
- TimeAfter string `json:"timeAfter"
mapstructure:"timeAfter,omitempty"`
CstZone *time.Location
ScopeConfigId uint64
ScopeConfig *models.TapdScopeConfig `json:"scopeConfig"`
@@ -38,7 +37,6 @@ type TapdOptions struct {
type TapdTaskData struct {
Options *TapdOptions
ApiClient *helper.ApiAsyncClient
- TimeAfter *time.Time
Connection *models.TapdConnection
}
diff --git a/backend/plugins/tapd/tasks/worklog_collector.go
b/backend/plugins/tapd/tasks/worklog_collector.go
index 076b682fb..845fbe161 100644
--- a/backend/plugins/tapd/tasks/worklog_collector.go
+++ b/backend/plugins/tapd/tasks/worklog_collector.go
@@ -34,12 +34,12 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_WORKLOG_TABLE)
logger := taskCtx.GetLogger()
logger.Info("collect worklogs")
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+ collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
incremental := collectorWithState.IsIncremental()
-
+ syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
Incremental: incremental,
ApiClient: data.ApiClient,
@@ -51,10 +51,10 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if data.TimeAfter != nil {
+ if syncPolicy != nil && syncPolicy.TimeAfter != nil {
query.Set("modified",
fmt.Sprintf(">%s",
-
data.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
}
if incremental {
query.Set("modified",
diff --git a/backend/plugins/teambition/api/blueprint200.go
b/backend/plugins/teambition/api/blueprint200.go
index 6ee599725..4c5611b7c 100644
--- a/backend/plugins/teambition/api/blueprint200.go
+++ b/backend/plugins/teambition/api/blueprint200.go
@@ -19,7 +19,6 @@ package api
import (
"fmt"
- "time"
"github.com/apache/incubator-devlake/plugins/teambition/models"
@@ -37,10 +36,9 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
plan := make(coreModels.PipelinePlan, len(bpScopes))
- plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId, syncPolicy)
+ plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan,
bpScopes, connectionId)
if err != nil {
return nil, nil, err
}
@@ -57,7 +55,6 @@ func makeDataSourcePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connectionId uint64,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, errors.Error) {
for i, bpScope := range bpScopes {
stage := plan[i]
@@ -68,9 +65,6 @@ func makeDataSourcePipelinePlanV200(
options := make(map[string]interface{})
options["projectId"] = bpScope.ScopeId
options["connectionId"] = connectionId
- if syncPolicy.TimeAfter != nil {
- options["timeAfter"] =
syncPolicy.TimeAfter.Format(time.RFC3339)
- }
subtasks, err := helper.MakePipelinePlanSubtasks(subtaskMetas,
plugin.DOMAIN_TYPES)
if err != nil {
diff --git a/backend/plugins/teambition/impl/impl.go
b/backend/plugins/teambition/impl/impl.go
index 2aeb2f11a..410dfd173 100644
--- a/backend/plugins/teambition/impl/impl.go
+++ b/backend/plugins/teambition/impl/impl.go
@@ -19,7 +19,6 @@ package impl
import (
"fmt"
- "time"
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/dal"
@@ -122,9 +121,8 @@ func (p Teambition) SubTaskMetas() []plugin.SubTaskMeta {
func (p Teambition) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes)
}
func (p Teambition) PrepareTaskData(taskCtx plugin.TaskContext, options
map[string]interface{}) (interface{}, errors.Error) {
@@ -152,16 +150,7 @@ func (p Teambition) PrepareTaskData(taskCtx
plugin.TaskContext, options map[stri
ApiClient: apiClient,
TenantId: connection.TenantId,
}
- var createdDateAfter time.Time
- if op.TimeAfter != "" {
- createdDateAfter, err =
errors.Convert01(time.Parse(time.RFC3339, op.TimeAfter))
- if err != nil {
- return nil, errors.BadInput.Wrap(err, "invalid value
for `createdDateAfter`")
- }
- }
- if !createdDateAfter.IsZero() {
- taskData.TimeAfter = &createdDateAfter
- }
+
return taskData, nil
}
diff --git a/backend/plugins/teambition/tasks/task_data.go
b/backend/plugins/teambition/tasks/task_data.go
index 6f4f30747..961f1f33d 100644
--- a/backend/plugins/teambition/tasks/task_data.go
+++ b/backend/plugins/teambition/tasks/task_data.go
@@ -18,16 +18,16 @@ limitations under the License.
package tasks
import (
+ "time"
+
"github.com/apache/incubator-devlake/core/errors"
helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
- "time"
)
type TeambitionOptions struct {
ConnectionId uint64 `json:"connectionId"`
ProjectId string `json:"projectId"`
PageSize uint64 `mapstruct:"pageSize"`
- TimeAfter string `json:"timeAfter"
mapstructure:"timeAfter,omitempty"`
CstZone *time.Location
TransformationRules TransformationRules `json:"transformationRules"`
}
@@ -35,7 +35,6 @@ type TeambitionOptions struct {
type TeambitionTaskData struct {
Options *TeambitionOptions
ApiClient *helper.ApiAsyncClient
- TimeAfter *time.Time
TenantId string
}
diff --git a/backend/plugins/trello/api/blueprint_v200.go
b/backend/plugins/trello/api/blueprint_v200.go
index 4329e37df..836e5cdd3 100644
--- a/backend/plugins/trello/api/blueprint_v200.go
+++ b/backend/plugins/trello/api/blueprint_v200.go
@@ -18,8 +18,6 @@ limitations under the License.
package api
import (
- "time"
-
"github.com/apache/incubator-devlake/core/models/domainlayer"
"github.com/apache/incubator-devlake/core/models/domainlayer/ticket"
@@ -38,7 +36,6 @@ func MakePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
scope []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
scopes, err := makeScopeV200(connectionId, scope)
if err != nil {
@@ -46,7 +43,7 @@ func MakePipelinePlanV200(
}
plan := make(coreModels.PipelinePlan, len(scope))
- plan, err = makePipelinePlanV200(subtaskMetas, plan, scope,
connectionId, syncPolicy)
+ plan, err = makePipelinePlanV200(subtaskMetas, plan, scope,
connectionId)
if err != nil {
return nil, nil, err
}
@@ -81,7 +78,7 @@ func makePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
plan coreModels.PipelinePlan,
scopes []*coreModels.BlueprintScope,
- connectionId uint64, syncPolicy *coreModels.SyncPolicy,
+ connectionId uint64,
) (coreModels.PipelinePlan, errors.Error) {
for i, scope := range scopes {
stage := plan[i]
@@ -93,9 +90,6 @@ func makePipelinePlanV200(
options := make(map[string]interface{})
options["connectionId"] = connectionId
options["scopeId"] = scope.ScopeId
- if syncPolicy.TimeAfter != nil {
- options["timeAfter"] =
syncPolicy.TimeAfter.Format(time.RFC3339)
- }
_, scopeConfig, err :=
scopeHelper.DbHelper().GetScopeAndConfig(connectionId, scope.ScopeId)
if err != nil {
diff --git a/backend/plugins/trello/impl/impl.go
b/backend/plugins/trello/impl/impl.go
index 39b177a0a..2b2edaaea 100644
--- a/backend/plugins/trello/impl/impl.go
+++ b/backend/plugins/trello/impl/impl.go
@@ -195,9 +195,8 @@ func (p Trello) ApiResources()
map[string]map[string]plugin.ApiResourceHandler {
func (p Trello) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
- return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes,
syncPolicy)
+ return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes)
}
func (p Trello) Close(taskCtx plugin.TaskContext) errors.Error {
diff --git a/backend/plugins/zentao/api/blueprint_V200_test.go
b/backend/plugins/zentao/api/blueprint_V200_test.go
index b379b7e3d..2cd62a269 100644
--- a/backend/plugins/zentao/api/blueprint_V200_test.go
+++ b/backend/plugins/zentao/api/blueprint_V200_test.go
@@ -71,10 +71,9 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
}*/
bpScopes := make([]*coreModels.BlueprintScope, 0)
bpScopes = append(bpScopes, bs)
- syncPolicy := &coreModels.SyncPolicy{}
plan := make(coreModels.PipelinePlan, len(bpScopes))
- plan, scopes, err := makePipelinePlanV200(nil, plan, bpScopes,
connection, syncPolicy)
+ plan, scopes, err := makePipelinePlanV200(nil, plan, bpScopes,
connection)
assert.Nil(t, err)
expectPlan := coreModels.PipelinePlan{
diff --git a/backend/plugins/zentao/api/blueprint_v200.go
b/backend/plugins/zentao/api/blueprint_v200.go
index b316cff43..d86105021 100644
--- a/backend/plugins/zentao/api/blueprint_v200.go
+++ b/backend/plugins/zentao/api/blueprint_v200.go
@@ -18,8 +18,6 @@ limitations under the License.
package api
import (
- "time"
-
"github.com/apache/incubator-devlake/core/errors"
coreModels "github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/models/domainlayer"
@@ -36,7 +34,6 @@ func MakeDataSourcePipelinePlanV200(
subtaskMetas []plugin.SubTaskMeta,
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
// get the connection info for url
connection := &models.ZentaoConnection{}
@@ -46,7 +43,7 @@ func MakeDataSourcePipelinePlanV200(
}
plan := make(coreModels.PipelinePlan, len(bpScopes))
- plan, scopes, err := makePipelinePlanV200(subtaskMetas, plan, bpScopes,
connection, syncPolicy)
+ plan, scopes, err := makePipelinePlanV200(subtaskMetas, plan, bpScopes,
connection)
if err != nil {
return nil, nil, err
}
@@ -59,7 +56,6 @@ func makePipelinePlanV200(
plan coreModels.PipelinePlan,
bpScopes []*coreModels.BlueprintScope,
connection *models.ZentaoConnection,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
domainScopes := make([]plugin.Scope, 0)
for i, bpScope := range bpScopes {
@@ -111,9 +107,6 @@ func makePipelinePlanV200(
}
}*/
- if syncPolicy.TimeAfter != nil {
- op.TimeAfter = syncPolicy.TimeAfter.Format(time.RFC3339)
- }
options, err := tasks.EncodeTaskOptions(op)
if err != nil {
return nil, nil, err
diff --git a/backend/plugins/zentao/impl/impl.go
b/backend/plugins/zentao/impl/impl.go
index cbea91b34..9f2f760cf 100644
--- a/backend/plugins/zentao/impl/impl.go
+++ b/backend/plugins/zentao/impl/impl.go
@@ -283,9 +283,8 @@ func (p Zentao) ApiResources()
map[string]map[string]plugin.ApiResourceHandler {
func (p Zentao) MakeDataSourcePipelinePlanV200(
connectionId uint64,
scopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
- return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes, syncPolicy)
+ return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(),
connectionId, scopes)
}
func (p Zentao) Close(taskCtx plugin.TaskContext) errors.Error {
diff --git a/backend/plugins/zentao/tasks/bug_commits_collector.go
b/backend/plugins/zentao/tasks/bug_commits_collector.go
index 63109bdfa..526ffc8a4 100644
--- a/backend/plugins/zentao/tasks/bug_commits_collector.go
+++ b/backend/plugins/zentao/tasks/bug_commits_collector.go
@@ -56,7 +56,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
Ctx: taskCtx,
Options: data.Options,
Table: RAW_BUG_COMMITS_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
diff --git a/backend/plugins/zentao/tasks/story_commits_collector.go
b/backend/plugins/zentao/tasks/story_commits_collector.go
index 1c1552d66..117fdc7d9 100644
--- a/backend/plugins/zentao/tasks/story_commits_collector.go
+++ b/backend/plugins/zentao/tasks/story_commits_collector.go
@@ -51,7 +51,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
Ctx: taskCtx,
Options: data.Options,
Table: RAW_STORY_COMMITS_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
diff --git a/backend/plugins/zentao/tasks/task_commits_collector.go
b/backend/plugins/zentao/tasks/task_commits_collector.go
index 0d6bacd84..4d4c54feb 100644
--- a/backend/plugins/zentao/tasks/task_commits_collector.go
+++ b/backend/plugins/zentao/tasks/task_commits_collector.go
@@ -50,7 +50,7 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
Ctx: taskCtx,
Options: data.Options,
Table: RAW_TASK_COMMITS_TABLE,
- }, data.TimeAfter)
+ })
if err != nil {
return err
}
diff --git a/backend/plugins/zentao/tasks/task_data.go
b/backend/plugins/zentao/tasks/task_data.go
index 9cf09292b..26bc9734c 100644
--- a/backend/plugins/zentao/tasks/task_data.go
+++ b/backend/plugins/zentao/tasks/task_data.go
@@ -19,7 +19,6 @@ package tasks
import (
"fmt"
- "time"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
@@ -37,7 +36,6 @@ type ZentaoOptions struct {
ConnectionId uint64 `json:"connectionId"`
ProjectId int64 `json:"projectId" mapstructure:"projectId"`
// TODO not support now
- TimeAfter string `json:"timeAfter"
mapstructure:"timeAfter,omitempty"`
ScopeConfigId uint64 `json:"scopeConfigId"
mapstructure:"scopeConfigId,omitempty"`
ScopeConfig *models.ZentaoScopeConfig `json:"scopeConfig"
mapstructure:"scopeConfig,omitempty"`
}
@@ -53,7 +51,6 @@ type ZentaoTaskData struct {
Options *ZentaoOptions
RemoteDb dal.Dal
- TimeAfter *time.Time
ProjectName string
Stories map[int64]struct{}
Tasks map[int64]struct{}
diff --git a/backend/server/api/blueprints/blueprints.go
b/backend/server/api/blueprints/blueprints.go
index 389247abb..b4f7410de 100644
--- a/backend/server/api/blueprints/blueprints.go
+++ b/backend/server/api/blueprints/blueprints.go
@@ -193,7 +193,6 @@ func Trigger(c *gin.Context) {
return
}
}
-
pipeline, err := services.TriggerBlueprint(id, syncPolicy)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error
triggering blueprint"))
diff --git a/backend/server/services/blueprint.go
b/backend/server/services/blueprint.go
index 29246b43f..3d7e67c2e 100644
--- a/backend/server/services/blueprint.go
+++ b/backend/server/services/blueprint.go
@@ -53,12 +53,7 @@ type BlueprintJob struct {
func (bj BlueprintJob) Run() {
blueprint := bj.Blueprint
- syncPolicy := &models.SyncPolicy{
- TimeAfter: blueprint.TimeAfter,
- FullSync: blueprint.FullSync,
- SkipOnFail: blueprint.SkipOnFail,
- }
- pipeline, err := createPipelineByBlueprint(blueprint, syncPolicy)
+ pipeline, err := createPipelineByBlueprint(blueprint,
&blueprint.SyncPolicy)
if err == ErrEmptyPlan {
blueprintLog.Info("Empty plan, blueprint id:[%d] blueprint
name:[%s]", blueprint.ID, blueprint.Name)
return
@@ -272,11 +267,9 @@ func ReloadBlueprints(c *cron.Cron) errors.Error {
}
func createPipelineByBlueprint(blueprint *models.Blueprint, syncPolicy
*models.SyncPolicy) (*models.Pipeline, errors.Error) {
+
var plan models.PipelinePlan
var err errors.Error
- if syncPolicy != nil && syncPolicy.FullSync {
- blueprint.FullSync = true
- }
if blueprint.Mode == models.BLUEPRINT_MODE_NORMAL {
plan, err = MakePlanForBlueprint(blueprint, syncPolicy)
if err != nil {
@@ -292,12 +285,9 @@ func createPipelineByBlueprint(blueprint
*models.Blueprint, syncPolicy *models.S
newPipeline.BlueprintId = blueprint.ID
newPipeline.Labels = blueprint.Labels
newPipeline.SkipOnFail = blueprint.SkipOnFail
-
- if syncPolicy != nil && syncPolicy.FullSync {
- newPipeline.FullSync = true
- } else {
- newPipeline.FullSync = blueprint.FullSync
- }
+ newPipeline.TimeAfter = blueprint.TimeAfter
+ newPipeline.FullSync = blueprint.FullSync
+ newPipeline.SkipCollectors = blueprint.SkipCollectors
// if the plan is empty, we should not create the pipeline
var shouldCreatePipeline bool
@@ -326,11 +316,6 @@ func createPipelineByBlueprint(blueprint
*models.Blueprint, syncPolicy *models.S
// MakePlanForBlueprint generates pipeline plan by version
func MakePlanForBlueprint(blueprint *models.Blueprint, syncPolicy
*models.SyncPolicy) (models.PipelinePlan, errors.Error) {
- if syncPolicy != nil {
- syncPolicy.TimeAfter = blueprint.TimeAfter
- syncPolicy.FullSync = blueprint.FullSync
- }
-
var plan models.PipelinePlan
// load project metric plugins and convert it to a map
metrics := make(map[string]json.RawMessage)
@@ -344,7 +329,11 @@ func MakePlanForBlueprint(blueprint *models.Blueprint,
syncPolicy *models.SyncPo
metrics[projectMetric.PluginName] =
json.RawMessage(projectMetric.PluginOption)
}
}
- plan, err := GeneratePlanJsonV200(blueprint.ProjectName, syncPolicy,
blueprint.Connections, metrics)
+ skipCollectors := false
+ if syncPolicy != nil && syncPolicy.SkipCollectors {
+ skipCollectors = true
+ }
+ plan, err := GeneratePlanJsonV200(blueprint.ProjectName,
blueprint.Connections, metrics, skipCollectors)
if err != nil {
return nil, err
}
@@ -388,5 +377,8 @@ func TriggerBlueprint(id uint64, syncPolicy
*models.SyncPolicy) (*models.Pipelin
logger.Error(err, "GetBlueprint, id: %d", id)
return nil, err
}
+ blueprint.SkipCollectors = syncPolicy.SkipCollectors
+ blueprint.FullSync = syncPolicy.FullSync
+
return createPipelineByBlueprint(blueprint, syncPolicy)
}
diff --git a/backend/server/services/blueprint_makeplan_v200.go
b/backend/server/services/blueprint_makeplan_v200.go
index b4f1bfff7..69e11bfc2 100644
--- a/backend/server/services/blueprint_makeplan_v200.go
+++ b/backend/server/services/blueprint_makeplan_v200.go
@@ -30,9 +30,9 @@ import (
// GeneratePlanJsonV200 generates pipeline plan according v2.0.0 definition
func GeneratePlanJsonV200(
projectName string,
- syncPolicy *coreModels.SyncPolicy,
connections []*coreModels.BlueprintConnection,
metrics map[string]json.RawMessage,
+ skipCollectors bool,
) (coreModels.PipelinePlan, errors.Error) {
var err errors.Error
// make plan for data-source coreModels fist. generate plan for each
@@ -56,7 +56,6 @@ func GeneratePlanJsonV200(
sourcePlans[i], pluginScopes, err =
pluginBp.MakeDataSourcePipelinePlanV200(
connection.ConnectionId,
connection.Scopes,
- syncPolicy,
)
if err != nil {
return nil, err
@@ -72,7 +71,7 @@ func GeneratePlanJsonV200(
}
// skip collectors
- if syncPolicy != nil && syncPolicy.SkipCollectors {
+ if skipCollectors {
for i, plan := range sourcePlans {
for j, stage := range plan {
for k, task := range stage {
diff --git a/backend/server/services/blueprint_makeplan_v200_test.go
b/backend/server/services/blueprint_makeplan_v200_test.go
index 093b9ea69..646996d4d 100644
--- a/backend/server/services/blueprint_makeplan_v200_test.go
+++ b/backend/server/services/blueprint_makeplan_v200_test.go
@@ -97,7 +97,7 @@ func TestMakePlanV200(t *testing.T) {
doraName: nil,
}
- plan, err := GeneratePlanJsonV200(projectName, syncPolicy, connections,
metrics)
+ plan, err := GeneratePlanJsonV200(projectName, connections, metrics,
false)
assert.Nil(t, err)
assert.Equal(t, expectedPlan, plan)
diff --git a/backend/server/services/pipeline_helper.go
b/backend/server/services/pipeline_helper.go
index a681608e0..f7077b5f6 100644
--- a/backend/server/services/pipeline_helper.go
+++ b/backend/server/services/pipeline_helper.go
@@ -61,8 +61,10 @@ func CreateDbPipeline(newPipeline *models.NewPipeline)
(pipeline *models.Pipelin
Message: "",
SpentSeconds: 0,
Plan: newPipeline.Plan,
- SkipOnFail: newPipeline.SkipOnFail,
- FullSync: newPipeline.FullSync,
+ SyncPolicy: models.SyncPolicy{
+ SkipOnFail: newPipeline.SkipOnFail,
+ FullSync: newPipeline.FullSync,
+ },
}
if newPipeline.BlueprintId != 0 {
dbPipeline.BlueprintId = newPipeline.BlueprintId
diff --git a/backend/server/services/remote/plugin/plugin_extensions.go
b/backend/server/services/remote/plugin/plugin_extensions.go
index 09e070383..5f1f4612c 100644
--- a/backend/server/services/remote/plugin/plugin_extensions.go
+++ b/backend/server/services/remote/plugin/plugin_extensions.go
@@ -43,7 +43,6 @@ func (p remoteMetricPlugin)
MakeMetricPluginPipelinePlanV200(projectName string,
func (p remoteDatasourcePlugin) MakeDataSourcePipelinePlanV200(
connectionId uint64,
bpScopes []*coreModels.BlueprintScope,
- syncPolicy *coreModels.SyncPolicy,
) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
connection := p.connectionTabler.New()
err := p.connHelper.FirstById(connection, connectionId)
diff --git a/backend/test/helper/api.go b/backend/test/helper/api.go
index 5caf817d0..9099f90ad 100644
--- a/backend/test/helper/api.go
+++ b/backend/test/helper/api.go
@@ -86,8 +86,10 @@ func (d *DevlakeClient) CreateBasicBlueprintV2(name string,
config *BlueprintV2C
Enable: true,
CronConfig: "manual",
IsManual: true,
- SkipOnFail: config.SkipOnFail,
- Labels: []string{"test-label"},
+ SyncPolicy: models.SyncPolicy{
+ SkipOnFail: config.SkipOnFail,
+ },
+ Labels: []string{"test-label"},
Connections: []*models.BlueprintConnection{
config.Connection,
},