This is an automated email from the ASF dual-hosted git repository.

likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 4abfcb234 feat: github jobs collector supports diffsync (#4456)
4abfcb234 is described below

commit 4abfcb234b72838c9b9f11b6dd013503d0ed1a67
Author: Klesh Wong <[email protected]>
AuthorDate: Tue Feb 21 21:31:56 2023 +0800

    feat: github jobs collector supports diffsync (#4456)
---
 backend/plugins/github/tasks/cicd_job_collector.go | 51 +++++++++++++++++-----
 1 file changed, 39 insertions(+), 12 deletions(-)

diff --git a/backend/plugins/github/tasks/cicd_job_collector.go 
b/backend/plugins/github/tasks/cicd_job_collector.go
index bbb080d6f..5db40eebc 100644
--- a/backend/plugins/github/tasks/cicd_job_collector.go
+++ b/backend/plugins/github/tasks/cicd_job_collector.go
@@ -20,14 +20,15 @@ package tasks
 import (
        "encoding/json"
        "fmt"
+       "net/http"
+       "net/url"
+       "reflect"
+
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/github/models"
-       "net/http"
-       "net/url"
-       "reflect"
 )
 
 const RAW_JOB_TABLE = "github_api_jobs"
@@ -43,11 +44,38 @@ var CollectJobsMeta = plugin.SubTaskMeta{
 func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*GithubTaskData)
-       cursor, err := db.Cursor(
+
+       // state manager
+       collectorWithState, err := 
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+               Ctx: taskCtx,
+               Params: GithubApiParams{
+                       ConnectionId: data.Options.ConnectionId,
+                       Name:         data.Options.Name,
+               },
+               Table: RAW_JOB_TABLE,
+       }, data.TimeAfter)
+       if err != nil {
+               return err
+       }
+
+       // load workflow_runs that need jobs collection
+       clauses := []dal.Clause{
                dal.Select("id"),
-               dal.From(models.GithubRun{}.TableName()),
-               dal.Where("repo_id = ? and connection_id = ?", 
data.Options.GithubId, data.Options.ConnectionId),
-       )
+               dal.From(&models.GithubRun{}),
+               dal.Where(
+                       "repo_id = ? AND connection_id = ?",
+                       data.Options.GithubId, data.Options.ConnectionId,
+               ),
+       }
+       // incremental collection
+       incremental := collectorWithState.IsIncremental()
+       if incremental {
+               clauses = append(
+                       clauses,
+                       dal.Where("github_updated_at > ?", 
collectorWithState.LatestState.LatestSuccessStart),
+               )
+       }
+       cursor, err := db.Cursor(clauses...)
        if err != nil {
                return err
        }
@@ -55,8 +83,8 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
        if err != nil {
                return err
        }
-
-       collector, err := api.NewApiCollector(api.ApiCollectorArgs{
+       // collect jobs
+       err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                RawDataSubTaskArgs: api.RawDataSubTaskArgs{
                        Ctx: taskCtx,
                        Params: GithubApiParams{
@@ -68,7 +96,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
                ApiClient:   data.ApiClient,
                PageSize:    100,
                Input:       iterator,
-               Incremental: false,
+               Incremental: incremental,
                UrlTemplate: "repos/{{ .Params.Name }}/actions/runs/{{ 
.Input.ID }}/jobs",
                Query: func(reqData *api.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
@@ -87,11 +115,10 @@ func CollectJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
                },
                AfterResponse: ignoreHTTPStatus404,
        })
-
        if err != nil {
                return err
        }
-       return collector.Execute()
+       return collectorWithState.Execute()
 }
 
 type SimpleGithubRun struct {

Reply via email to