This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch fix#6075
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/fix#6075 by this push:
     new 1f84d11b2 fix: jenkins incremental collectors
1f84d11b2 is described below

commit 1f84d11b2232fdb4cf6db6aac8cc20043a7c1eee
Author: abeizn <[email protected]>
AuthorDate: Thu Sep 14 13:56:56 2023 +0800

    fix: jenkins incremental collectors
---
 backend/plugins/jenkins/tasks/stage_collector.go | 32 ++++++++++++++----------
 1 file changed, 19 insertions(+), 13 deletions(-)

diff --git a/backend/plugins/jenkins/tasks/stage_collector.go 
b/backend/plugins/jenkins/tasks/stage_collector.go
index 6c8478750..bf159c1d6 100644
--- a/backend/plugins/jenkins/tasks/stage_collector.go
+++ b/backend/plugins/jenkins/tasks/stage_collector.go
@@ -48,15 +48,29 @@ type SimpleBuild struct {
 func CollectApiStages(taskCtx plugin.SubTaskContext) errors.Error {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*JenkinsTaskData)
+
+       collectorWithState, err := 
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+               Params: JenkinsApiParams{
+                       ConnectionId: data.Options.ConnectionId,
+                       FullName:     data.Options.JobFullName,
+               },
+               Ctx:   taskCtx,
+               Table: RAW_STAGE_TABLE,
+       })
+       if err != nil {
+               return err
+       }
+
        clauses := []dal.Clause{
                dal.Select("tjb.number,tjb.full_name"),
                dal.From("_tool_jenkins_builds as tjb"),
                dal.Where(`tjb.connection_id = ? and tjb.job_path = ? and 
tjb.job_name = ? and tjb.class = ?`,
                        data.Options.ConnectionId, data.Options.JobPath, 
data.Options.JobName, "WorkflowRun"),
        }
-       syncPolicy := taskCtx.TaskContext().SyncPolicy()
-       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
-               clauses = append(clauses, dal.Where(`tjb.start_time >= ?`, 
syncPolicy.TimeAfter))
+
+       incremental := collectorWithState.IsIncremental()
+       if incremental && collectorWithState.LatestState.LatestSuccessStart != 
nil {
+               clauses = append(clauses, dal.Where(`tjb.start_time >= ?`, 
collectorWithState.LatestState.LatestSuccessStart))
        }
 
        cursor, err := db.Cursor(clauses...)
@@ -70,15 +84,7 @@ func CollectApiStages(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       collector, err := api.NewApiCollector(api.ApiCollectorArgs{
-               RawDataSubTaskArgs: api.RawDataSubTaskArgs{
-                       Params: JenkinsApiParams{
-                               ConnectionId: data.Options.ConnectionId,
-                               FullName:     data.Options.JobFullName,
-                       },
-                       Ctx:   taskCtx,
-                       Table: RAW_STAGE_TABLE,
-               },
+       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                Input:       iterator,
                UrlTemplate: fmt.Sprintf("%sjob/%s/{{ .Input.Number 
}}/wfapi/describe", data.Options.JobPath, data.Options.JobName),
@@ -106,5 +112,5 @@ func CollectApiStages(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collector.Execute()
+       return collectorWithState.Execute()
 }

Reply via email to