klesh commented on code in PR #4442:
URL: 
https://github.com/apache/incubator-devlake/pull/4442#discussion_r1110020381


##########
backend/plugins/github/tasks/cicd_run_collector.go:
##########
@@ -52,48 +62,131 @@ func CollectRuns(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       //incremental := collectorWithState.IsIncremental()
+       incremental := collectorWithState.IsIncremental()
+
+       // step 1: fetch records created after createdAfter
+       var createdAfter *time.Time
+       if incremental {
+               createdAfter = collectorWithState.LatestState.LatestSuccessStart
+       } else {
+               createdAfter = data.TimeAfter
+       }
+
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               ApiClient: data.ApiClient,
-               PageSize:  30,
-               //Incremental: incremental,
+               ApiClient:   data.ApiClient,
+               PageSize:    PAGE_SIZE,
+               Incremental: incremental,
                UrlTemplate: "repos/{{ .Params.Name }}/actions/runs",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
-                       // if data.CreatedDateAfter != nil, we set since once
-                       // There is a bug for github rest api, so temporarily 
commented the following code
-                       //if data.CreatedDateAfter != nil {
-                       //      startDate := 
data.CreatedDateAfter.Format("2006-01-02")
-                       //      query.Set("created", fmt.Sprintf("%s..*", 
startDate))
-                       //}
-                       //// if incremental == true, we overwrite it
-                       //if incremental {
-                       //      startDate := 
collectorWithState.LatestState.LatestSuccessStart.Format("2006-01-02")
-                       //      query.Set("created", fmt.Sprintf("%s..*", 
startDate))
-                       //}
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("per_page", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        return query, nil
                },
-               GetTotalPages: GetTotalPagesFromResponse,
+               // use Undetermined strategy so we can stop fetching further 
pages by using
+               // ErrFinishCollect
+               Concurrency: 10,
                ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
                        body := &GithubRawRunsResult{}
                        err := helper.UnmarshalResponse(res, body)
                        if err != nil {
                                return nil, err
                        }
-                       return body.GithubWorkflowRuns, nil
+
+                       // time filter or diff sync
+                       if createdAfter != nil {
+                               // if the first record of the page was created 
before minCreated, return emtpy set and stop
+                               firstRun := &models.GithubRun{}
+                               if e := 
json.Unmarshal(body.GithubWorkflowRuns[0], firstRun); e != nil {
+                                       return nil, errors.Default.Wrap(e, 
"failed to unmarshal first run")
+                               }
+                               if 
firstRun.GithubCreatedAt.Before(*createdAfter) {
+                                       return nil, helper.ErrFinishCollect
+                               }
+                               // if the last record was created before 
minCreated, return records and stop
+                               lastRun := &models.GithubRun{}
+                               if e := 
json.Unmarshal(body.GithubWorkflowRuns[len(body.GithubWorkflowRuns)-1], 
lastRun); e != nil {
+                                       return nil, errors.Default.Wrap(e, 
"failed to unmarshal last run")
+                               }
+                               if 
lastRun.GithubCreatedAt.Before(*createdAfter) {
+                                       err = helper.ErrFinishCollect
+                               }
+                       }
+
+                       return body.GithubWorkflowRuns, err
                },
        })
 
        if err != nil {
                return err
        }
 
-       return collectorWithState.Execute()
+       err = collectorWithState.Execute()
+       if err != nil {
+               return err
+       }
+
+       // step 2: for incremental collection, we have to update previous 
collected data which status is unfinished
+       if incremental {
+               // update existing data by collecting unfinished runs prior to 
LatestState.LatestSuccessStart
+               return collectUnfinishedRuns(taskCtx)
+       }
+       return nil
 }
 
 type GithubRawRunsResult struct {
        TotalCount         int64             `json:"total_count"`
        GithubWorkflowRuns []json.RawMessage `json:"workflow_runs"`
 }
+
+func collectUnfinishedRuns(taskCtx plugin.SubTaskContext) errors.Error {
+       data := taskCtx.GetData().(*GithubTaskData)
+       db := taskCtx.GetDal()
+
+       // load unfinished runs from the database
+       cursor, err := db.Cursor(
+               dal.Select("id"),
+               dal.From(&models.GithubRun{}),
+               dal.Where(
+                       "repo_id = ? AND connection_id = ? AND status IN 
('ACTION_REQUIRED', 'STALE', 'IN_PROGRESS', 'QUEUED', 'REQUESTED', 'WAITING', 
'PENDING')",

Review Comment:
   TDLR: No,  it won't, we are expecting `raw_table` to have multiple records 
for one run.
   
   
   We actually are counting on this behavior, the extractor is working in a 
`CreateOrUpdate` manner so `Incremental Collection` could function correctly 
because we are deleting the whole BATCH before extraction which means we have 
to keep the OLDER data in the `raw_table` since Incremental collection would 
only fetch the NEWER data.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to