This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new a5b09dbd6 fix(gitlab): fix incremental deployment collector (#6254)
a5b09dbd6 is described below

commit a5b09dbd605620a51f3f3131084cdbc259286b07
Author: Lynwee <[email protected]>
AuthorDate: Mon Oct 16 20:34:06 2023 +0800

    fix(gitlab): fix incremental deployment collector (#6254)
    
    * fix(gitlab): fix incremental deployment collector
    
    * fix(gitlab): rollback to full collector
    
    * refactor(gitlab): remove unused codes
    
    * fix(gitlab): collect deployment incrementally
---
 .../pluginhelper/api/api_collector_with_state.go   | 28 ++++++++++++----------
 backend/plugins/bitbucket/tasks/api_common.go      |  6 ++---
 backend/plugins/github/tasks/cicd_job_collector.go |  2 +-
 .../plugins/github/tasks/pr_commit_collector.go    |  2 +-
 .../plugins/github/tasks/pr_review_collector.go    |  2 +-
 .../plugins/github_graphql/tasks/job_collector.go  |  2 +-
 .../plugins/gitlab/tasks/deployment_collector.go   |  6 ++++-
 backend/plugins/jenkins/tasks/stage_collector.go   |  2 +-
 .../jira/tasks/development_panel_collector.go      |  2 +-
 .../jira/tasks/issue_changelog_collector.go        |  2 +-
 .../plugins/jira/tasks/issue_comment_collector.go  |  2 +-
 backend/plugins/jira/tasks/remotelink_collector.go |  2 +-
 backend/plugins/jira/tasks/worklog_collector.go    |  2 +-
 .../plugins/zentao/tasks/bug_commits_collector.go  |  2 +-
 .../zentao/tasks/story_commits_collector.go        |  2 +-
 .../plugins/zentao/tasks/task_commits_collector.go |  2 +-
 16 files changed, 36 insertions(+), 30 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go 
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index 1fac444ec..c4b22f110 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -35,10 +35,11 @@ type ApiCollectorStateManager struct {
        RawDataSubTaskArgs
        // *ApiCollector
        // *GraphqlCollector
-       subtasks     []plugin.SubTask
-       newState     models.CollectorLatestState
-       IsIncreamtal bool
-       Since        *time.Time
+       subtasks      []plugin.SubTask
+       newState      models.CollectorLatestState
+       IsIncremental bool
+       Since         *time.Time
+       Before        *time.Time
 }
 
 // NewStatefulApiCollector create a new ApiCollectorStateManager
@@ -69,30 +70,30 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs) 
(*ApiCollectorStateManager
 
        // Calculate incremental and since based on syncPolicy and old state
        syncPolicy := args.Ctx.TaskContext().SyncPolicy()
-       var isIncreamtal bool
+       var isIncremental bool
        var since *time.Time
 
        if syncPolicy == nil {
                // 1. If no syncPolicy, incremental and since is 
oldState.LatestSuccessStart
-               isIncreamtal = true
+               isIncremental = true
                since = oldLatestSuccessStart
        } else if oldLatestSuccessStart == nil {
                // 2. If no oldState.LatestSuccessStart, not incremental and 
since is syncPolicy.TimeAfter
-               isIncreamtal = false
+               isIncremental = false
                since = syncPolicy.TimeAfter
        } else if syncPolicy.FullSync {
                // 3. If fullSync true, not incremental and since is 
syncPolicy.TimeAfter
-               isIncreamtal = false
+               isIncremental = false
                since = syncPolicy.TimeAfter
        } else if syncPolicy.TimeAfter != nil {
                // 4. If syncPolicy.TimeAfter not nil
                if oldTimeAfter != nil && 
syncPolicy.TimeAfter.Before(*oldTimeAfter) {
                        // 4.1 If oldTimeAfter not nil and syncPolicy.TimeAfter 
before oldTimeAfter, incremental is false and since is syncPolicy.TimeAfter
-                       isIncreamtal = false
+                       isIncremental = false
                        since = syncPolicy.TimeAfter
                } else {
                        // 4.2 If oldTimeAfter nil or syncPolicy.TimeAfter 
after oldTimeAfter, incremental is true and since is oldState.LatestSuccessStart
-                       isIncreamtal = true
+                       isIncremental = true
                        since = oldLatestSuccessStart
                }
        }
@@ -104,8 +105,9 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs) 
(*ApiCollectorStateManager
        return &ApiCollectorStateManager{
                RawDataSubTaskArgs: args,
                newState:           oldState,
-               IsIncreamtal:       isIncreamtal,
+               IsIncremental:      isIncremental,
                Since:              since,
+               Before:             &currentTime,
        }, nil
 
 }
@@ -113,7 +115,7 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs) 
(*ApiCollectorStateManager
 // InitCollector init the embedded collector
 func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs) 
errors.Error {
        args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
-       args.Incremental = args.Incremental || m.IsIncreamtal
+       args.Incremental = args.Incremental || m.IsIncremental
        apiCollector, err := NewApiCollector(args)
        if err != nil {
                return err
@@ -180,7 +182,7 @@ func NewStatefulApiCollectorForFinalizableEntity(args 
FinalizableApiCollectorArg
        }
 
        createdAfter := manager.Since
-       isIncremental := manager.IsIncreamtal
+       isIncremental := manager.IsIncremental
        // step 1: create a collector to collect newly added records
        err = manager.InitCollector(ApiCollectorArgs{
                ApiClient: args.ApiClient,
diff --git a/backend/plugins/bitbucket/tasks/api_common.go 
b/backend/plugins/bitbucket/tasks/api_common.go
index 7e2e76775..8990fdbaf 100644
--- a/backend/plugins/bitbucket/tasks/api_common.go
+++ b/backend/plugins/bitbucket/tasks/api_common.go
@@ -175,7 +175,7 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *
                        data.Options.FullName, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
                clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", 
*collectorWithState.Since))
        }
 
@@ -199,7 +199,7 @@ func GetIssuesIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *api.Ap
                        data.Options.FullName, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
                clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", 
*collectorWithState.Since))
        }
        // construct the input iterator
@@ -222,7 +222,7 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *api
                        data.Options.FullName, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
                clauses = append(clauses, dal.Where("bitbucket_complete_on > 
?", *collectorWithState.Since))
        }
        // construct the input iterator
diff --git a/backend/plugins/github/tasks/cicd_job_collector.go 
b/backend/plugins/github/tasks/cicd_job_collector.go
index 93617f779..1c9fe63e6 100644
--- a/backend/plugins/github/tasks/cicd_job_collector.go
+++ b/backend/plugins/github/tasks/cicd_job_collector.go
@@ -73,7 +73,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
                        data.Options.GithubId, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
                clauses = append(clauses, dal.Where("github_updated_at > ?", 
collectorWithState.Since))
        }
        cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/github/tasks/pr_commit_collector.go 
b/backend/plugins/github/tasks/pr_commit_collector.go
index 77c0e2a26..25c5402ac 100644
--- a/backend/plugins/github/tasks/pr_commit_collector.go
+++ b/backend/plugins/github/tasks/pr_commit_collector.go
@@ -78,7 +78,7 @@ func CollectApiPullRequestCommits(taskCtx 
plugin.SubTaskContext) errors.Error {
                dal.From(models.GithubPullRequest{}.TableName()),
                dal.Where("repo_id = ? and connection_id=?", 
data.Options.GithubId, data.Options.ConnectionId),
        }
-       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
                clauses = append(
                        clauses,
                        dal.Where("github_updated_at > ?", 
collectorWithState.Since),
diff --git a/backend/plugins/github/tasks/pr_review_collector.go 
b/backend/plugins/github/tasks/pr_review_collector.go
index 6cc8d8ce9..a0f6576ef 100644
--- a/backend/plugins/github/tasks/pr_review_collector.go
+++ b/backend/plugins/github/tasks/pr_review_collector.go
@@ -70,7 +70,7 @@ func CollectApiPullRequestReviews(taskCtx 
plugin.SubTaskContext) errors.Error {
                dal.From(models.GithubPullRequest{}.TableName()),
                dal.Where("repo_id = ? and connection_id=?", 
data.Options.GithubId, data.Options.ConnectionId),
        }
-       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
                clauses = append(
                        clauses,
                        dal.Where("github_updated_at > ?", 
collectorWithState.Since),
diff --git a/backend/plugins/github_graphql/tasks/job_collector.go 
b/backend/plugins/github_graphql/tasks/job_collector.go
index 41836d3a8..7c838f980 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -122,7 +122,7 @@ func CollectGraphqlJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where("repo_id = ? and connection_id=?", 
data.Options.GithubId, data.Options.ConnectionId),
                dal.Orderby("github_updated_at DESC"),
        }
-       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
                clauses = append(clauses, dal.Where("github_updated_at > ?", 
*collectorWithState.Since))
        }
 
diff --git a/backend/plugins/gitlab/tasks/deployment_collector.go 
b/backend/plugins/gitlab/tasks/deployment_collector.go
index cdd45f712..89c86721a 100644
--- a/backend/plugins/gitlab/tasks/deployment_collector.go
+++ b/backend/plugins/gitlab/tasks/deployment_collector.go
@@ -61,10 +61,14 @@ func CollectDeployment(taskCtx plugin.SubTaskContext) 
errors.Error {
                                return query, err
                        }
                        // https://gitlab.com/gitlab-org/gitlab/-/issues/328500
-                       query.Set("order_by", "created_at")
+                       // 
https://docs.gitlab.com/ee/api/deployments.html#list-project-deployments
+                       query.Set("order_by", "updated_at")
                        if collectorWithState.Since != nil {
                                query.Set("updated_after", 
collectorWithState.Since.Format(time.RFC3339))
                        }
+                       if collectorWithState.Before != nil {
+                               query.Set("updated_before", 
collectorWithState.Before.Format(time.RFC3339))
+                       }
                        return query, nil
                },
                GetTotalPages:  GetTotalPagesFromResponse,
diff --git a/backend/plugins/jenkins/tasks/stage_collector.go 
b/backend/plugins/jenkins/tasks/stage_collector.go
index 47530c95e..4503d8560 100644
--- a/backend/plugins/jenkins/tasks/stage_collector.go
+++ b/backend/plugins/jenkins/tasks/stage_collector.go
@@ -67,7 +67,7 @@ func CollectApiStages(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where(`tjb.connection_id = ? and tjb.job_path = ? and 
tjb.job_name = ? and tjb.class = ?`,
                        data.Options.ConnectionId, data.Options.JobPath, 
data.Options.JobName, "WorkflowRun"),
        }
-       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
                clauses = append(clauses, dal.Where(`tjb.start_time >= ?`, 
collectorWithState.Since))
        }
        cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/jira/tasks/development_panel_collector.go 
b/backend/plugins/jira/tasks/development_panel_collector.go
index 64a7ea372..c513c5bf5 100644
--- a/backend/plugins/jira/tasks/development_panel_collector.go
+++ b/backend/plugins/jira/tasks/development_panel_collector.go
@@ -72,7 +72,7 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
                dal.Where("bi.connection_id=? and bi.board_id = ?", 
data.Options.ConnectionId, data.Options.BoardId),
        }
-       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
                clauses = append(clauses, dal.Where("i.updated > ?", 
collectorWithState.Since))
        }
        cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/jira/tasks/issue_changelog_collector.go 
b/backend/plugins/jira/tasks/issue_changelog_collector.go
index bcbf46dc0..9f29436c9 100644
--- a/backend/plugins/jira/tasks/issue_changelog_collector.go
+++ b/backend/plugins/jira/tasks/issue_changelog_collector.go
@@ -71,7 +71,7 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
                dal.Where("bi.connection_id=? and bi.board_id = ? AND 
i.std_type != ? and i.changelog_total > 100", data.Options.ConnectionId, 
data.Options.BoardId, "Epic"),
        }
-       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
                clauses = append(clauses, dal.Where("i.updated > ?", 
collectorWithState.Since))
        }
 
diff --git a/backend/plugins/jira/tasks/issue_comment_collector.go 
b/backend/plugins/jira/tasks/issue_comment_collector.go
index 71c91491e..aa6ca2cdd 100644
--- a/backend/plugins/jira/tasks/issue_comment_collector.go
+++ b/backend/plugins/jira/tasks/issue_comment_collector.go
@@ -71,7 +71,7 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
                dal.Where("bi.connection_id=? and bi.board_id = ? AND 
i.std_type != ? AND i.comment_total > 100", data.Options.ConnectionId, 
data.Options.BoardId, "Epic"),
        }
-       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
                clauses = append(clauses, dal.Where("i.updated > ?", 
collectorWithState.Since))
        }
        if logger.IsLevelEnabled(log.LOG_DEBUG) {
diff --git a/backend/plugins/jira/tasks/remotelink_collector.go 
b/backend/plugins/jira/tasks/remotelink_collector.go
index 450723ec2..2072b1f28 100644
--- a/backend/plugins/jira/tasks/remotelink_collector.go
+++ b/backend/plugins/jira/tasks/remotelink_collector.go
@@ -70,7 +70,7 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = 
i.connection_id AND bi.issue_id = i.issue_id)"),
                dal.Where("bi.connection_id=? and bi.board_id = ?", 
data.Options.ConnectionId, data.Options.BoardId),
        }
-       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
                clauses = append(clauses, dal.Where("i.updated > ?", 
collectorWithState.Since))
        }
        cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/jira/tasks/worklog_collector.go 
b/backend/plugins/jira/tasks/worklog_collector.go
index 71121597f..a22005bc3 100644
--- a/backend/plugins/jira/tasks/worklog_collector.go
+++ b/backend/plugins/jira/tasks/worklog_collector.go
@@ -66,7 +66,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where("i.updated > i.created AND bi.connection_id = ?  AND 
bi.board_id = ?  ", data.Options.ConnectionId, data.Options.BoardId),
                dal.Groupby("i.issue_id, i.updated"),
        }
-       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
                clauses = append(clauses, dal.Having("i.updated > ? AND 
(i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND 
COUNT(wl.worklog_id) > 0))", collectorWithState.Since))
        } else {
                /*
diff --git a/backend/plugins/zentao/tasks/bug_commits_collector.go 
b/backend/plugins/zentao/tasks/bug_commits_collector.go
index cb55ceda6..c1ee2f74e 100644
--- a/backend/plugins/zentao/tasks/bug_commits_collector.go
+++ b/backend/plugins/zentao/tasks/bug_commits_collector.go
@@ -70,7 +70,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                        data.Options.ProjectId, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
                clauses = append(clauses, dal.Where("last_edited_date is not 
null and last_edited_date > ?", collectorWithState.Since))
        }
        cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/zentao/tasks/story_commits_collector.go 
b/backend/plugins/zentao/tasks/story_commits_collector.go
index a536f52f1..489ad2945 100644
--- a/backend/plugins/zentao/tasks/story_commits_collector.go
+++ b/backend/plugins/zentao/tasks/story_commits_collector.go
@@ -66,7 +66,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where(`_tool_zentao_project_stories.project_id = ? and
                        _tool_zentao_project_stories.connection_id = ?`, 
data.Options.ProjectId, data.Options.ConnectionId),
        }
-       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
                clauses = append(clauses, dal.Where("last_edited_date is not 
null and last_edited_date > ?", collectorWithState.Since))
        }
        cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/zentao/tasks/task_commits_collector.go 
b/backend/plugins/zentao/tasks/task_commits_collector.go
index 665069508..5a3204302 100644
--- a/backend/plugins/zentao/tasks/task_commits_collector.go
+++ b/backend/plugins/zentao/tasks/task_commits_collector.go
@@ -64,7 +64,7 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                        data.Options.ProjectId, data.Options.ConnectionId,
                ),
        }
-       if collectorWithState.IsIncreamtal && collectorWithState.Since != nil {
+       if collectorWithState.IsIncremental && collectorWithState.Since != nil {
                clauses = append(clauses, dal.Where("last_edited_date is not 
null and last_edited_date > ?", collectorWithState.Since))
        }
        cursor, err := db.Cursor(clauses...)

Reply via email to