This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new a511098a2 fix: jira/jenkins/github/tapd incremental collectors (#6078)
a511098a2 is described below
commit a511098a2bb7c3a19ac3e7648ed2c57313b8e98b
Author: abeizn <[email protected]>
AuthorDate: Thu Sep 14 16:08:33 2023 +0800
fix: jira/jenkins/github/tapd incremental collectors (#6078)
* fix: jira incremental collectors
* fix: github incremental collectors
* fix: tapd incremental collectors
* fix: syncPolicy can not be updated
* fix: jenkins incremental collectors
* fix: remove some not necessay comments
---
backend/plugins/github/tasks/comment_collector.go | 7 ++---
backend/plugins/github/tasks/commit_collector.go | 5 ++--
backend/plugins/github/tasks/issue_collector.go | 5 ++--
.../github/tasks/pr_review_comment_collector.go | 7 ++---
backend/plugins/jenkins/tasks/stage_collector.go | 32 +++++++++++++---------
backend/plugins/jira/tasks/issue_collector.go | 9 +-----
.../plugins/tapd/tasks/bug_changelog_collector.go | 11 ++------
backend/plugins/tapd/tasks/bug_collector.go | 11 ++------
backend/plugins/tapd/tasks/iteration_collector.go | 11 ++------
.../tapd/tasks/story_changelog_collector.go | 11 ++------
backend/plugins/tapd/tasks/story_collector.go | 11 ++------
.../plugins/tapd/tasks/task_changelog_collector.go | 11 ++------
backend/plugins/tapd/tasks/task_collector.go | 11 ++------
backend/plugins/tapd/tasks/worklog_collector.go | 11 ++------
backend/server/services/blueprint.go | 13 +++++----
15 files changed, 59 insertions(+), 107 deletions(-)
diff --git a/backend/plugins/github/tasks/comment_collector.go
b/backend/plugins/github/tasks/comment_collector.go
index 21e57f853..197a1f7a0 100644
--- a/backend/plugins/github/tasks/comment_collector.go
+++ b/backend/plugins/github/tasks/comment_collector.go
@@ -59,7 +59,6 @@ func CollectApiComments(taskCtx plugin.SubTaskContext)
errors.Error {
}
incremental := collectorWithState.IsIncremental()
- syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
@@ -69,11 +68,9 @@ func CollectApiComments(taskCtx plugin.SubTaskContext)
errors.Error {
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ if collectorWithState.TimeAfter != nil {
// Note that `since` is for filtering records
by the `updated` time
- // which is not ideal for semantic reasons and
would result in slightly more records than expected.
- // But we have no choice since it is the only
available field we could exploit from the API.
- query.Set("since",
syncPolicy.TimeAfter.String())
+ query.Set("since",
collectorWithState.TimeAfter.String())
}
// if incremental == true, we overwrite it
if incremental {
diff --git a/backend/plugins/github/tasks/commit_collector.go
b/backend/plugins/github/tasks/commit_collector.go
index c8ed10780..ca37c6227 100644
--- a/backend/plugins/github/tasks/commit_collector.go
+++ b/backend/plugins/github/tasks/commit_collector.go
@@ -59,7 +59,6 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext)
errors.Error {
}
incremental := collectorWithState.IsIncremental()
- syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
@@ -80,8 +79,8 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext)
errors.Error {
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- query.Set("since",
syncPolicy.TimeAfter.String())
+ if collectorWithState.TimeAfter != nil {
+ query.Set("since",
collectorWithState.TimeAfter.String())
}
if incremental {
query.Set("since",
collectorWithState.LatestState.LatestSuccessStart.String())
diff --git a/backend/plugins/github/tasks/issue_collector.go
b/backend/plugins/github/tasks/issue_collector.go
index 551081c2d..c27eaea86 100644
--- a/backend/plugins/github/tasks/issue_collector.go
+++ b/backend/plugins/github/tasks/issue_collector.go
@@ -59,7 +59,6 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
}
incremental := collectorWithState.IsIncremental()
- syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
@@ -80,8 +79,8 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- query.Set("since",
syncPolicy.TimeAfter.String())
+ if collectorWithState.TimeAfter != nil {
+ query.Set("since",
collectorWithState.TimeAfter.String())
}
if incremental {
query.Set("since",
collectorWithState.LatestState.LatestSuccessStart.String())
diff --git a/backend/plugins/github/tasks/pr_review_comment_collector.go
b/backend/plugins/github/tasks/pr_review_comment_collector.go
index bd03b032b..eb3720ce0 100644
--- a/backend/plugins/github/tasks/pr_review_comment_collector.go
+++ b/backend/plugins/github/tasks/pr_review_comment_collector.go
@@ -62,7 +62,6 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext)
errors.Error {
}
incremental := collectorWithState.IsIncremental()
- syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
@@ -77,11 +76,9 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext)
errors.Error {
UrlTemplate: "repos/{{ .Params.Name }}/pulls/comments",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+ if collectorWithState.TimeAfter != nil {
// Note that `since` is for filtering records
by the `updated` time
- // which is not ideal for semantic reasons and
would result in slightly more records than expected.
- // But we have no choice since it is the only
available field we could exploit from the API.
- query.Set("since",
syncPolicy.TimeAfter.String())
+ query.Set("since",
collectorWithState.TimeAfter.String())
}
// if incremental == true, we overwrite it
if incremental {
diff --git a/backend/plugins/jenkins/tasks/stage_collector.go
b/backend/plugins/jenkins/tasks/stage_collector.go
index 6c8478750..bf159c1d6 100644
--- a/backend/plugins/jenkins/tasks/stage_collector.go
+++ b/backend/plugins/jenkins/tasks/stage_collector.go
@@ -48,15 +48,29 @@ type SimpleBuild struct {
func CollectApiStages(taskCtx plugin.SubTaskContext) errors.Error {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*JenkinsTaskData)
+
+ collectorWithState, err :=
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ Params: JenkinsApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ FullName: data.Options.JobFullName,
+ },
+ Ctx: taskCtx,
+ Table: RAW_STAGE_TABLE,
+ })
+ if err != nil {
+ return err
+ }
+
clauses := []dal.Clause{
dal.Select("tjb.number,tjb.full_name"),
dal.From("_tool_jenkins_builds as tjb"),
dal.Where(`tjb.connection_id = ? and tjb.job_path = ? and
tjb.job_name = ? and tjb.class = ?`,
data.Options.ConnectionId, data.Options.JobPath,
data.Options.JobName, "WorkflowRun"),
}
- syncPolicy := taskCtx.TaskContext().SyncPolicy()
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- clauses = append(clauses, dal.Where(`tjb.start_time >= ?`,
syncPolicy.TimeAfter))
+
+ incremental := collectorWithState.IsIncremental()
+ if incremental && collectorWithState.LatestState.LatestSuccessStart !=
nil {
+ clauses = append(clauses, dal.Where(`tjb.start_time >= ?`,
collectorWithState.LatestState.LatestSuccessStart))
}
cursor, err := db.Cursor(clauses...)
@@ -70,15 +84,7 @@ func CollectApiStages(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- collector, err := api.NewApiCollector(api.ApiCollectorArgs{
- RawDataSubTaskArgs: api.RawDataSubTaskArgs{
- Params: JenkinsApiParams{
- ConnectionId: data.Options.ConnectionId,
- FullName: data.Options.JobFullName,
- },
- Ctx: taskCtx,
- Table: RAW_STAGE_TABLE,
- },
+ err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
Input: iterator,
UrlTemplate: fmt.Sprintf("%sjob/%s/{{ .Input.Number
}}/wfapi/describe", data.Options.JobPath, data.Options.JobName),
@@ -106,5 +112,5 @@ func CollectApiStages(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
diff --git a/backend/plugins/jira/tasks/issue_collector.go
b/backend/plugins/jira/tasks/issue_collector.go
index a6cf3b8ea..b00cd4ff3 100644
--- a/backend/plugins/jira/tasks/issue_collector.go
+++ b/backend/plugins/jira/tasks/issue_collector.go
@@ -77,14 +77,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext)
errors.Error {
} else {
logger.Info("got user's timezone: %v", loc.String())
}
-
- jql := ""
- syncPolicy := taskCtx.TaskContext().SyncPolicy()
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- jql = buildJQL(syncPolicy.TimeAfter,
collectorWithState.LatestState.LatestSuccessStart, incremental, loc)
- } else {
- jql = buildJQL(nil,
collectorWithState.LatestState.LatestSuccessStart, incremental, loc)
- }
+ jql := buildJQL(collectorWithState.TimeAfter,
collectorWithState.LatestState.LatestSuccessStart, incremental, loc)
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
diff --git a/backend/plugins/tapd/tasks/bug_changelog_collector.go
b/backend/plugins/tapd/tasks/bug_changelog_collector.go
index b967cfa78..264e45342 100644
--- a/backend/plugins/tapd/tasks/bug_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/bug_changelog_collector.go
@@ -39,7 +39,6 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
incremental := collectorWithState.IsIncremental()
- syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
Incremental: incremental,
@@ -52,15 +51,11 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created desc")
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- query.Set("created",
- fmt.Sprintf(">%s",
-
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.TimeAfter != nil {
+ query.Set("created", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
}
if incremental {
- query.Set("created",
- fmt.Sprintf(">%s",
-
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+ query.Set("created", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/tapd/tasks/bug_collector.go
b/backend/plugins/tapd/tasks/bug_collector.go
index 3265d320f..b5c85f68a 100644
--- a/backend/plugins/tapd/tasks/bug_collector.go
+++ b/backend/plugins/tapd/tasks/bug_collector.go
@@ -39,7 +39,6 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error {
return err
}
incremental := collectorWithState.IsIncremental()
- syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
Incremental: incremental,
ApiClient: data.ApiClient,
@@ -52,15 +51,11 @@ func CollectBugs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- query.Set("modified",
- fmt.Sprintf(">%s",
-
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.TimeAfter != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
}
if incremental {
- query.Set("modified",
- fmt.Sprintf(">%s",
-
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/tapd/tasks/iteration_collector.go
b/backend/plugins/tapd/tasks/iteration_collector.go
index fe21fbc4f..3a6ebc182 100644
--- a/backend/plugins/tapd/tasks/iteration_collector.go
+++ b/backend/plugins/tapd/tasks/iteration_collector.go
@@ -41,7 +41,6 @@ func CollectIterations(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
incremental := collectorWithState.IsIncremental()
- syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(api.ApiCollectorArgs{
Incremental: incremental,
ApiClient: data.ApiClient,
@@ -54,15 +53,11 @@ func CollectIterations(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- query.Set("modified",
- fmt.Sprintf(">%s",
-
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.TimeAfter != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
}
if incremental {
- query.Set("modified",
- fmt.Sprintf(">%s",
-
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/tapd/tasks/story_changelog_collector.go
b/backend/plugins/tapd/tasks/story_changelog_collector.go
index a0bad43bd..7aa863417 100644
--- a/backend/plugins/tapd/tasks/story_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/story_changelog_collector.go
@@ -39,7 +39,6 @@ func CollectStoryChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
logger := taskCtx.GetLogger()
logger.Info("collect storyChangelogs")
incremental := collectorWithState.IsIncremental()
- syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
Incremental: incremental,
@@ -52,15 +51,11 @@ func CollectStoryChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- query.Set("created",
- fmt.Sprintf(">%s",
-
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.TimeAfter != nil {
+ query.Set("created", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
}
if incremental {
- query.Set("created",
- fmt.Sprintf(">%s",
-
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+ query.Set("created", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/tapd/tasks/story_collector.go
b/backend/plugins/tapd/tasks/story_collector.go
index e88dd87ac..211c7f268 100644
--- a/backend/plugins/tapd/tasks/story_collector.go
+++ b/backend/plugins/tapd/tasks/story_collector.go
@@ -39,7 +39,6 @@ func CollectStorys(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
incremental := collectorWithState.IsIncremental()
- syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
Incremental: incremental,
ApiClient: data.ApiClient,
@@ -52,15 +51,11 @@ func CollectStorys(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- query.Set("modified",
- fmt.Sprintf(">%s",
-
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.TimeAfter != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
}
if incremental {
- query.Set("modified",
- fmt.Sprintf(">%s",
-
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/tapd/tasks/task_changelog_collector.go
b/backend/plugins/tapd/tasks/task_changelog_collector.go
index 9a457c7bd..e4efec684 100644
--- a/backend/plugins/tapd/tasks/task_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/task_changelog_collector.go
@@ -39,7 +39,6 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
logger := taskCtx.GetLogger()
logger.Info("collect taskChangelogs")
incremental := collectorWithState.IsIncremental()
- syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
Incremental: incremental,
ApiClient: data.ApiClient,
@@ -51,15 +50,11 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- query.Set("created",
- fmt.Sprintf(">%s",
-
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.TimeAfter != nil {
+ query.Set("created", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
}
if incremental {
- query.Set("created",
- fmt.Sprintf(">%s",
-
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+ query.Set("created", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/tapd/tasks/task_collector.go
b/backend/plugins/tapd/tasks/task_collector.go
index 36d505e5c..3511fff11 100644
--- a/backend/plugins/tapd/tasks/task_collector.go
+++ b/backend/plugins/tapd/tasks/task_collector.go
@@ -39,7 +39,6 @@ func CollectTasks(taskCtx plugin.SubTaskContext) errors.Error
{
return err
}
incremental := collectorWithState.IsIncremental()
- syncPolicy := taskCtx.TaskContext().SyncPolicy()
collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
Incremental: incremental,
@@ -53,15 +52,11 @@ func CollectTasks(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- query.Set("modified",
- fmt.Sprintf(">%s",
-
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.TimeAfter != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
}
if incremental {
- query.Set("modified",
- fmt.Sprintf(">%s",
-
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/tapd/tasks/worklog_collector.go
b/backend/plugins/tapd/tasks/worklog_collector.go
index 845fbe161..935f89758 100644
--- a/backend/plugins/tapd/tasks/worklog_collector.go
+++ b/backend/plugins/tapd/tasks/worklog_collector.go
@@ -39,7 +39,6 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
incremental := collectorWithState.IsIncremental()
- syncPolicy := taskCtx.TaskContext().SyncPolicy()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
Incremental: incremental,
ApiClient: data.ApiClient,
@@ -51,15 +50,11 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- query.Set("modified",
- fmt.Sprintf(">%s",
-
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.TimeAfter != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
}
if incremental {
- query.Set("modified",
- fmt.Sprintf(">%s",
-
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
+ query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/server/services/blueprint.go
b/backend/server/services/blueprint.go
index 7176ddabc..ab4097a95 100644
--- a/backend/server/services/blueprint.go
+++ b/backend/server/services/blueprint.go
@@ -161,13 +161,8 @@ func validateBlueprintAndMakePlan(blueprint
*models.Blueprint) errors.Error {
return errors.BadInput.New("invalid plan")
}
} else if blueprint.Mode == models.BLUEPRINT_MODE_NORMAL {
- syncPolicy := &models.SyncPolicy{
- TimeAfter: blueprint.TimeAfter,
- FullSync: blueprint.FullSync,
- SkipOnFail: blueprint.SkipOnFail,
- }
var e errors.Error
- blueprint.Plan, e = MakePlanForBlueprint(blueprint, syncPolicy)
+ blueprint.Plan, e = MakePlanForBlueprint(blueprint,
&blueprint.SyncPolicy)
if err != nil {
return e
}
@@ -208,10 +203,16 @@ func PatchBlueprint(id uint64, body
map[string]interface{}) (*models.Blueprint,
if err != nil {
return nil, err
}
+
// make sure mode is not being updated
if originMode != blueprint.Mode {
return nil, errors.Default.New("mode is not updatable")
}
+ // syncPolicy can be updated, so we need to decode it again
+ err = helper.DecodeMapStruct(body, &blueprint.SyncPolicy, true)
+ if err != nil {
+ return nil, err
+ }
blueprint, err = saveBlueprint(blueprint)
if err != nil {