This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new a5b09dbd6 fix(gitlab): fix incremental deployment collector (#6254)
a5b09dbd6 is described below
commit a5b09dbd605620a51f3f3131084cdbc259286b07
Author: Lynwee <[email protected]>
AuthorDate: Mon Oct 16 20:34:06 2023 +0800
fix(gitlab): fix incremental deployment collector (#6254)
* fix(gitlab): fix incremental deployment collector
* fix(gitlab): rollback to full collector
* refactor(gitlab): remove unused codes
* fix(gitlab): collect deployment incrementally
---
.../pluginhelper/api/api_collector_with_state.go | 28 ++++++++++++----------
backend/plugins/bitbucket/tasks/api_common.go | 6 ++---
backend/plugins/github/tasks/cicd_job_collector.go | 2 +-
.../plugins/github/tasks/pr_commit_collector.go | 2 +-
.../plugins/github/tasks/pr_review_collector.go | 2 +-
.../plugins/github_graphql/tasks/job_collector.go | 2 +-
.../plugins/gitlab/tasks/deployment_collector.go | 6 ++++-
backend/plugins/jenkins/tasks/stage_collector.go | 2 +-
.../jira/tasks/development_panel_collector.go | 2 +-
.../jira/tasks/issue_changelog_collector.go | 2 +-
.../plugins/jira/tasks/issue_comment_collector.go | 2 +-
backend/plugins/jira/tasks/remotelink_collector.go | 2 +-
backend/plugins/jira/tasks/worklog_collector.go | 2 +-
.../plugins/zentao/tasks/bug_commits_collector.go | 2 +-
.../zentao/tasks/story_commits_collector.go | 2 +-
.../plugins/zentao/tasks/task_commits_collector.go | 2 +-
16 files changed, 36 insertions(+), 30 deletions(-)
diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index 1fac444ec..c4b22f110 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -35,10 +35,11 @@ type ApiCollectorStateManager struct {
RawDataSubTaskArgs
// *ApiCollector
// *GraphqlCollector
- subtasks []plugin.SubTask
- newState models.CollectorLatestState
- IsIncreamtal bool
- Since *time.Time
+ subtasks []plugin.SubTask
+ newState models.CollectorLatestState
+ IsIncremental bool
+ Since *time.Time
+ Before *time.Time
}
// NewStatefulApiCollector create a new ApiCollectorStateManager
@@ -69,30 +70,30 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs)
(*ApiCollectorStateManager
// Calculate incremental and since based on syncPolicy and old state
syncPolicy := args.Ctx.TaskContext().SyncPolicy()
- var isIncreamtal bool
+ var isIncremental bool
var since *time.Time
if syncPolicy == nil {
// 1. If no syncPolicy, incremental and since is
oldState.LatestSuccessStart
- isIncreamtal = true
+ isIncremental = true
since = oldLatestSuccessStart
} else if oldLatestSuccessStart == nil {
// 2. If no oldState.LatestSuccessStart, not incremental and
since is syncPolicy.TimeAfter
- isIncreamtal = false
+ isIncremental = false
since = syncPolicy.TimeAfter
} else if syncPolicy.FullSync {
// 3. If fullSync true, not incremental and since is
syncPolicy.TimeAfter
- isIncreamtal = false
+ isIncremental = false
since = syncPolicy.TimeAfter
} else if syncPolicy.TimeAfter != nil {
// 4. If syncPolicy.TimeAfter not nil
if oldTimeAfter != nil &&
syncPolicy.TimeAfter.Before(*oldTimeAfter) {
// 4.1 If oldTimeAfter not nil and syncPolicy.TimeAfter
before oldTimeAfter, incremental is false and since is syncPolicy.TimeAfter
- isIncreamtal = false
+ isIncremental = false
since = syncPolicy.TimeAfter
} else {
// 4.2 If oldTimeAfter nil or syncPolicy.TimeAfter
after oldTimeAfter, incremental is true and since is oldState.LatestSuccessStart
- isIncreamtal = true
+ isIncremental = true
since = oldLatestSuccessStart
}
}
@@ -104,8 +105,9 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs)
(*ApiCollectorStateManager
return &ApiCollectorStateManager{
RawDataSubTaskArgs: args,
newState: oldState,
- IsIncreamtal: isIncreamtal,
+ IsIncremental: isIncremental,
Since: since,
+ Before: ¤tTime,
}, nil
}
@@ -113,7 +115,7 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs)
(*ApiCollectorStateManager
// InitCollector init the embedded collector
func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs)
errors.Error {
args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
- args.Incremental = args.Incremental || m.IsIncreamtal
+ args.Incremental = args.Incremental || m.IsIncremental
apiCollector, err := NewApiCollector(args)
if err != nil {
return err
@@ -180,7 +182,7 @@ func NewStatefulApiCollectorForFinalizableEntity(args
FinalizableApiCollectorArg
}
createdAfter := manager.Since
- isIncremental := manager.IsIncreamtal
+ isIncremental := manager.IsIncremental
// step 1: create a collector to collect newly added records
err = manager.InitCollector(ApiCollectorArgs{
ApiClient: args.ApiClient,
diff --git a/backend/plugins/bitbucket/tasks/api_common.go
b/backend/plugins/bitbucket/tasks/api_common.go
index 7e2e76775..8990fdbaf 100644
--- a/backend/plugins/bitbucket/tasks/api_common.go
+++ b/backend/plugins/bitbucket/tasks/api_common.go
@@ -175,7 +175,7 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext,
collectorWithState *
data.Options.FullName, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*collectorWithState.Since))
}
@@ -199,7 +199,7 @@ func GetIssuesIterator(taskCtx plugin.SubTaskContext,
collectorWithState *api.Ap
data.Options.FullName, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*collectorWithState.Since))
}
// construct the input iterator
@@ -222,7 +222,7 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext,
collectorWithState *api
data.Options.FullName, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where("bitbucket_complete_on >
?", *collectorWithState.Since))
}
// construct the input iterator
diff --git a/backend/plugins/github/tasks/cicd_job_collector.go
b/backend/plugins/github/tasks/cicd_job_collector.go
index 93617f779..1c9fe63e6 100644
--- a/backend/plugins/github/tasks/cicd_job_collector.go
+++ b/backend/plugins/github/tasks/cicd_job_collector.go
@@ -73,7 +73,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
data.Options.GithubId, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where("github_updated_at > ?",
collectorWithState.Since))
}
cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/github/tasks/pr_commit_collector.go
b/backend/plugins/github/tasks/pr_commit_collector.go
index 77c0e2a26..25c5402ac 100644
--- a/backend/plugins/github/tasks/pr_commit_collector.go
+++ b/backend/plugins/github/tasks/pr_commit_collector.go
@@ -78,7 +78,7 @@ func CollectApiPullRequestCommits(taskCtx
plugin.SubTaskContext) errors.Error {
dal.From(models.GithubPullRequest{}.TableName()),
dal.Where("repo_id = ? and connection_id=?",
data.Options.GithubId, data.Options.ConnectionId),
}
- if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(
clauses,
dal.Where("github_updated_at > ?",
collectorWithState.Since),
diff --git a/backend/plugins/github/tasks/pr_review_collector.go
b/backend/plugins/github/tasks/pr_review_collector.go
index 6cc8d8ce9..a0f6576ef 100644
--- a/backend/plugins/github/tasks/pr_review_collector.go
+++ b/backend/plugins/github/tasks/pr_review_collector.go
@@ -70,7 +70,7 @@ func CollectApiPullRequestReviews(taskCtx
plugin.SubTaskContext) errors.Error {
dal.From(models.GithubPullRequest{}.TableName()),
dal.Where("repo_id = ? and connection_id=?",
data.Options.GithubId, data.Options.ConnectionId),
}
- if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(
clauses,
dal.Where("github_updated_at > ?",
collectorWithState.Since),
diff --git a/backend/plugins/github_graphql/tasks/job_collector.go
b/backend/plugins/github_graphql/tasks/job_collector.go
index 41836d3a8..7c838f980 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -122,7 +122,7 @@ func CollectGraphqlJobs(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where("repo_id = ? and connection_id=?",
data.Options.GithubId, data.Options.ConnectionId),
dal.Orderby("github_updated_at DESC"),
}
- if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where("github_updated_at > ?",
*collectorWithState.Since))
}
diff --git a/backend/plugins/gitlab/tasks/deployment_collector.go
b/backend/plugins/gitlab/tasks/deployment_collector.go
index cdd45f712..89c86721a 100644
--- a/backend/plugins/gitlab/tasks/deployment_collector.go
+++ b/backend/plugins/gitlab/tasks/deployment_collector.go
@@ -61,10 +61,14 @@ func CollectDeployment(taskCtx plugin.SubTaskContext)
errors.Error {
return query, err
}
// https://gitlab.com/gitlab-org/gitlab/-/issues/328500
- query.Set("order_by", "created_at")
+ //
https://docs.gitlab.com/ee/api/deployments.html#list-project-deployments
+ query.Set("order_by", "updated_at")
if collectorWithState.Since != nil {
query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
}
+ if collectorWithState.Before != nil {
+ query.Set("updated_before",
collectorWithState.Before.Format(time.RFC3339))
+ }
return query, nil
},
GetTotalPages: GetTotalPagesFromResponse,
diff --git a/backend/plugins/jenkins/tasks/stage_collector.go
b/backend/plugins/jenkins/tasks/stage_collector.go
index 47530c95e..4503d8560 100644
--- a/backend/plugins/jenkins/tasks/stage_collector.go
+++ b/backend/plugins/jenkins/tasks/stage_collector.go
@@ -67,7 +67,7 @@ func CollectApiStages(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where(`tjb.connection_id = ? and tjb.job_path = ? and
tjb.job_name = ? and tjb.class = ?`,
data.Options.ConnectionId, data.Options.JobPath,
data.Options.JobName, "WorkflowRun"),
}
- if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where(`tjb.start_time >= ?`,
collectorWithState.Since))
}
cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/jira/tasks/development_panel_collector.go
b/backend/plugins/jira/tasks/development_panel_collector.go
index 64a7ea372..c513c5bf5 100644
--- a/backend/plugins/jira/tasks/development_panel_collector.go
+++ b/backend/plugins/jira/tasks/development_panel_collector.go
@@ -72,7 +72,7 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ?",
data.Options.ConnectionId, data.Options.BoardId),
}
- if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
}
cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/jira/tasks/issue_changelog_collector.go
b/backend/plugins/jira/tasks/issue_changelog_collector.go
index bcbf46dc0..9f29436c9 100644
--- a/backend/plugins/jira/tasks/issue_changelog_collector.go
+++ b/backend/plugins/jira/tasks/issue_changelog_collector.go
@@ -71,7 +71,7 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ? AND
i.std_type != ? and i.changelog_total > 100", data.Options.ConnectionId,
data.Options.BoardId, "Epic"),
}
- if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
}
diff --git a/backend/plugins/jira/tasks/issue_comment_collector.go
b/backend/plugins/jira/tasks/issue_comment_collector.go
index 71c91491e..aa6ca2cdd 100644
--- a/backend/plugins/jira/tasks/issue_comment_collector.go
+++ b/backend/plugins/jira/tasks/issue_comment_collector.go
@@ -71,7 +71,7 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ? AND
i.std_type != ? AND i.comment_total > 100", data.Options.ConnectionId,
data.Options.BoardId, "Epic"),
}
- if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
}
if logger.IsLevelEnabled(log.LOG_DEBUG) {
diff --git a/backend/plugins/jira/tasks/remotelink_collector.go
b/backend/plugins/jira/tasks/remotelink_collector.go
index 450723ec2..2072b1f28 100644
--- a/backend/plugins/jira/tasks/remotelink_collector.go
+++ b/backend/plugins/jira/tasks/remotelink_collector.go
@@ -70,7 +70,7 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ?",
data.Options.ConnectionId, data.Options.BoardId),
}
- if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
}
cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/jira/tasks/worklog_collector.go
b/backend/plugins/jira/tasks/worklog_collector.go
index 71121597f..a22005bc3 100644
--- a/backend/plugins/jira/tasks/worklog_collector.go
+++ b/backend/plugins/jira/tasks/worklog_collector.go
@@ -66,7 +66,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where("i.updated > i.created AND bi.connection_id = ? AND
bi.board_id = ? ", data.Options.ConnectionId, data.Options.BoardId),
dal.Groupby("i.issue_id, i.updated"),
}
- if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Having("i.updated > ? AND
(i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND
COUNT(wl.worklog_id) > 0))", collectorWithState.Since))
} else {
/*
diff --git a/backend/plugins/zentao/tasks/bug_commits_collector.go
b/backend/plugins/zentao/tasks/bug_commits_collector.go
index cb55ceda6..c1ee2f74e 100644
--- a/backend/plugins/zentao/tasks/bug_commits_collector.go
+++ b/backend/plugins/zentao/tasks/bug_commits_collector.go
@@ -70,7 +70,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
data.Options.ProjectId, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where("last_edited_date is not
null and last_edited_date > ?", collectorWithState.Since))
}
cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/zentao/tasks/story_commits_collector.go
b/backend/plugins/zentao/tasks/story_commits_collector.go
index a536f52f1..489ad2945 100644
--- a/backend/plugins/zentao/tasks/story_commits_collector.go
+++ b/backend/plugins/zentao/tasks/story_commits_collector.go
@@ -66,7 +66,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where(`_tool_zentao_project_stories.project_id = ? and
_tool_zentao_project_stories.connection_id = ?`,
data.Options.ProjectId, data.Options.ConnectionId),
}
- if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where("last_edited_date is not
null and last_edited_date > ?", collectorWithState.Since))
}
cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/zentao/tasks/task_commits_collector.go
b/backend/plugins/zentao/tasks/task_commits_collector.go
index 665069508..5a3204302 100644
--- a/backend/plugins/zentao/tasks/task_commits_collector.go
+++ b/backend/plugins/zentao/tasks/task_commits_collector.go
@@ -64,7 +64,7 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
data.Options.ProjectId, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+ if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where("last_edited_date is not
null and last_edited_date > ?", collectorWithState.Since))
}
cursor, err := db.Cursor(clauses...)