likyh commented on code in PR #4442:
URL: 
https://github.com/apache/incubator-devlake/pull/4442#discussion_r1109806694


##########
backend/plugins/github/tasks/cicd_run_collector.go:
##########
@@ -52,48 +62,131 @@ func CollectRuns(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       //incremental := collectorWithState.IsIncremental()
+       incremental := collectorWithState.IsIncremental()
+
+       // step 1: fetch records created after createdAfter
+       var createdAfter *time.Time
+       if incremental {
+               createdAfter = collectorWithState.LatestState.LatestSuccessStart
+       } else {
+               createdAfter = data.TimeAfter
+       }
+
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               ApiClient: data.ApiClient,
-               PageSize:  30,
-               //Incremental: incremental,
+               ApiClient:   data.ApiClient,
+               PageSize:    PAGE_SIZE,
+               Incremental: incremental,
                UrlTemplate: "repos/{{ .Params.Name }}/actions/runs",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
-                       // if data.CreatedDateAfter != nil, we set since once
-                       // There is a bug for github rest api, so temporarily 
commented the following code
-                       //if data.CreatedDateAfter != nil {
-                       //      startDate := 
data.CreatedDateAfter.Format("2006-01-02")
-                       //      query.Set("created", fmt.Sprintf("%s..*", 
startDate))
-                       //}
-                       //// if incremental == true, we overwrite it
-                       //if incremental {
-                       //      startDate := 
collectorWithState.LatestState.LatestSuccessStart.Format("2006-01-02")
-                       //      query.Set("created", fmt.Sprintf("%s..*", 
startDate))
-                       //}
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("per_page", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        return query, nil
                },
-               GetTotalPages: GetTotalPagesFromResponse,
+               // use Undetermined strategy so we can stop fetching further 
pages by using
+               // ErrFinishCollect
+               Concurrency: 10,
                ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
                        body := &GithubRawRunsResult{}
                        err := helper.UnmarshalResponse(res, body)
                        if err != nil {
                                return nil, err
                        }
-                       return body.GithubWorkflowRuns, nil
+
+                       // time filter or diff sync
+                       if createdAfter != nil {
+                               // if the first record of the page was created 
before minCreated, return emtpy set and stop
+                               firstRun := &models.GithubRun{}
+                               if e := 
json.Unmarshal(body.GithubWorkflowRuns[0], firstRun); e != nil {
+                                       return nil, errors.Default.Wrap(e, 
"failed to unmarshal first run")
+                               }
+                               if 
firstRun.GithubCreatedAt.Before(*createdAfter) {
+                                       return nil, helper.ErrFinishCollect
+                               }
+                               // if the last record was created before 
minCreated, return records and stop
+                               lastRun := &models.GithubRun{}
+                               if e := 
json.Unmarshal(body.GithubWorkflowRuns[len(body.GithubWorkflowRuns)-1], 
lastRun); e != nil {
+                                       return nil, errors.Default.Wrap(e, 
"failed to unmarshal last run")
+                               }
+                               if 
lastRun.GithubCreatedAt.Before(*createdAfter) {
+                                       err = helper.ErrFinishCollect
+                               }
+                       }
+
+                       return body.GithubWorkflowRuns, err
                },
        })
 
        if err != nil {
                return err
        }
 
-       return collectorWithState.Execute()
+       err = collectorWithState.Execute()
+       if err != nil {
+               return err
+       }
+
+       // step 2: for incremental collection, we have to update previous 
collected data which status is unfinished
+       if incremental {
+               // update existing data by collecting unfinished runs prior to 
LatestState.LatestSuccessStart
+               return collectUnfinishedRuns(taskCtx)
+       }
+       return nil
 }
 
 type GithubRawRunsResult struct {
        TotalCount         int64             `json:"total_count"`
        GithubWorkflowRuns []json.RawMessage `json:"workflow_runs"`
 }
+
+func collectUnfinishedRuns(taskCtx plugin.SubTaskContext) errors.Error {
+       data := taskCtx.GetData().(*GithubTaskData)
+       db := taskCtx.GetDal()
+
+       // load unfinished runs from the database
+       cursor, err := db.Cursor(
+               dal.Select("id"),
+               dal.From(&models.GithubRun{}),
+               dal.Where(
+                       "repo_id = ? AND connection_id = ? AND status IN 
('ACTION_REQUIRED', 'STALE', 'IN_PROGRESS', 'QUEUED', 'REQUESTED', 'WAITING', 
'PENDING')",
+                       data.Options.GithubId, data.Options.ConnectionId,
+               ),
+       )
+       if err != nil {
+               return err
+       }
+       iterator, err := helper.NewDalCursorIterator(db, cursor, 
reflect.TypeOf(SimpleGithubRun{}))
+       if err != nil {
+               return err
+       }
+
+       // collect details from api
+       collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
+               RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+                       Ctx: taskCtx,
+                       Params: GithubApiParams{
+                               ConnectionId: data.Options.ConnectionId,
+                               Name:         data.Options.Name,
+                       },
+                       Table: RAW_RUN_TABLE,
+               },
+               ApiClient:   data.ApiClient,
+               Input:       iterator,
+               Incremental: true,
+               UrlTemplate: "repos/{{ .Params.Name }}/actions/runs/{{ 
.Input.ID }}",

Review Comment:
   How many runs will collect one by one in a collector?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to