klesh commented on code in PR #4442:
URL:
https://github.com/apache/incubator-devlake/pull/4442#discussion_r1110020381
##########
backend/plugins/github/tasks/cicd_run_collector.go:
##########
@@ -52,48 +62,131 @@ func CollectRuns(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- //incremental := collectorWithState.IsIncremental()
+ incremental := collectorWithState.IsIncremental()
+
+ // step 1: fetch records created after createdAfter
+ var createdAfter *time.Time
+ if incremental {
+ createdAfter = collectorWithState.LatestState.LatestSuccessStart
+ } else {
+ createdAfter = data.TimeAfter
+ }
+
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: 30,
- //Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: PAGE_SIZE,
+ Incremental: incremental,
UrlTemplate: "repos/{{ .Params.Name }}/actions/runs",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- // if data.CreatedDateAfter != nil, we set since once
- // There is a bug for github rest api, so temporarily
commented the following code
- //if data.CreatedDateAfter != nil {
- // startDate :=
data.CreatedDateAfter.Format("2006-01-02")
- // query.Set("created", fmt.Sprintf("%s..*",
startDate))
- //}
- //// if incremental == true, we overwrite it
- //if incremental {
- // startDate :=
collectorWithState.LatestState.LatestSuccessStart.Format("2006-01-02")
- // query.Set("created", fmt.Sprintf("%s..*",
startDate))
- //}
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("per_page", fmt.Sprintf("%v",
reqData.Pager.Size))
return query, nil
},
- GetTotalPages: GetTotalPagesFromResponse,
+ // use Undetermined strategy so we can stop fetching further
pages by using
+ // ErrFinishCollect
+ Concurrency: 10,
ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
body := &GithubRawRunsResult{}
err := helper.UnmarshalResponse(res, body)
if err != nil {
return nil, err
}
- return body.GithubWorkflowRuns, nil
+
+ // time filter or diff sync
+ if createdAfter != nil {
+ // if the first record of the page was created
before minCreated, return emtpy set and stop
+ firstRun := &models.GithubRun{}
+ if e :=
json.Unmarshal(body.GithubWorkflowRuns[0], firstRun); e != nil {
+ return nil, errors.Default.Wrap(e,
"failed to unmarshal first run")
+ }
+ if
firstRun.GithubCreatedAt.Before(*createdAfter) {
+ return nil, helper.ErrFinishCollect
+ }
+ // if the last record was created before
minCreated, return records and stop
+ lastRun := &models.GithubRun{}
+ if e :=
json.Unmarshal(body.GithubWorkflowRuns[len(body.GithubWorkflowRuns)-1],
lastRun); e != nil {
+ return nil, errors.Default.Wrap(e,
"failed to unmarshal last run")
+ }
+ if
lastRun.GithubCreatedAt.Before(*createdAfter) {
+ err = helper.ErrFinishCollect
+ }
+ }
+
+ return body.GithubWorkflowRuns, err
},
})
if err != nil {
return err
}
- return collectorWithState.Execute()
+ err = collectorWithState.Execute()
+ if err != nil {
+ return err
+ }
+
+ // step 2: for incremental collection, we have to update previous
collected data which status is unfinished
+ if incremental {
+ // update existing data by collecting unfinished runs prior to
LatestState.LatestSuccessStart
+ return collectUnfinishedRuns(taskCtx)
+ }
+ return nil
}
type GithubRawRunsResult struct {
TotalCount int64 `json:"total_count"`
GithubWorkflowRuns []json.RawMessage `json:"workflow_runs"`
}
+
+func collectUnfinishedRuns(taskCtx plugin.SubTaskContext) errors.Error {
+ data := taskCtx.GetData().(*GithubTaskData)
+ db := taskCtx.GetDal()
+
+ // load unfinished runs from the database
+ cursor, err := db.Cursor(
+ dal.Select("id"),
+ dal.From(&models.GithubRun{}),
+ dal.Where(
+ "repo_id = ? AND connection_id = ? AND status IN
('ACTION_REQUIRED', 'STALE', 'IN_PROGRESS', 'QUEUED', 'REQUESTED', 'WAITING',
'PENDING')",
Review Comment:
TDLR: No, it won't, we are expecting `raw_table` to have multiple records
for one run.
We actually are counting on this behavior, the extractor is working in a
`CreateOrUpdate` manner so `Incremental Collection` could function correctly
because we are deleting the whole BATCH before extraction which means we have
to keep the OLDER data in the `raw_table` since Incremental collection would
only fetch the NEWER data.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]