This is an automated email from the ASF dual-hosted git repository.

likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new a6ed2ee25 feat: github runs collector supports timeFilter/diffSync 
(#4442)
a6ed2ee25 is described below

commit a6ed2ee25b1e579f2a004eafa1276053ff2118d8
Author: Klesh Wong <[email protected]>
AuthorDate: Mon Feb 20 15:47:25 2023 +0800

    feat: github runs collector supports timeFilter/diffSync (#4442)
---
 backend/plugins/github/tasks/cicd_run_collector.go | 131 ++++++++++++++++++---
 1 file changed, 112 insertions(+), 19 deletions(-)

diff --git a/backend/plugins/github/tasks/cicd_run_collector.go 
b/backend/plugins/github/tasks/cicd_run_collector.go
index 5990f5231..19d6b9d02 100644
--- a/backend/plugins/github/tasks/cicd_run_collector.go
+++ b/backend/plugins/github/tasks/cicd_run_collector.go
@@ -20,16 +20,26 @@ package tasks
 import (
        "encoding/json"
        "fmt"
+       "io"
        "net/http"
        "net/url"
+       "reflect"
+       "time"
 
+       "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+       "github.com/apache/incubator-devlake/plugins/github/models"
 )
 
 const RAW_RUN_TABLE = "github_api_runs"
 
+// Although the API accepts a maximum of 100 entries per page, sometimes
+// the response body is too large which would lead to request failures
+// https://github.com/apache/incubator-devlake/issues/3199
+const PAGE_SIZE = 30
+
 var CollectRunsMeta = plugin.SubTaskMeta{
        Name:             "collectRuns",
        EntryPoint:       CollectRuns,
@@ -40,7 +50,7 @@ var CollectRunsMeta = plugin.SubTaskMeta{
 
 func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error {
        data := taskCtx.GetData().(*GithubTaskData)
-       collectorWithState, err := 
helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+       collectorWithState, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: GithubApiParams{
                        ConnectionId: data.Options.ConnectionId,
@@ -52,37 +62,58 @@ func CollectRuns(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       //incremental := collectorWithState.IsIncremental()
+       incremental := collectorWithState.IsIncremental()
+
+       // step 1: fetch records created after createdAfter
+       var createdAfter *time.Time
+       if incremental {
+               createdAfter = collectorWithState.LatestState.LatestSuccessStart
+       } else {
+               createdAfter = data.TimeAfter
+       }
+
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               ApiClient: data.ApiClient,
-               PageSize:  30,
-               //Incremental: incremental,
+               ApiClient:   data.ApiClient,
+               PageSize:    PAGE_SIZE,
+               Incremental: incremental,
                UrlTemplate: "repos/{{ .Params.Name }}/actions/runs",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
-                       // if data.CreatedDateAfter != nil, we set since once
-                       // There is a bug for github rest api, so temporarily 
commented the following code
-                       //if data.CreatedDateAfter != nil {
-                       //      startDate := 
data.CreatedDateAfter.Format("2006-01-02")
-                       //      query.Set("created", fmt.Sprintf("%s..*", 
startDate))
-                       //}
-                       //// if incremental == true, we overwrite it
-                       //if incremental {
-                       //      startDate := 
collectorWithState.LatestState.LatestSuccessStart.Format("2006-01-02")
-                       //      query.Set("created", fmt.Sprintf("%s..*", 
startDate))
-                       //}
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("per_page", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        return query, nil
                },
-               GetTotalPages: GetTotalPagesFromResponse,
+               // use Undetermined strategy so we can stop fetching further 
pages by using
+               // ErrFinishCollect
+               Concurrency: 10,
                ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
                        body := &GithubRawRunsResult{}
                        err := helper.UnmarshalResponse(res, body)
                        if err != nil {
                                return nil, err
                        }
-                       return body.GithubWorkflowRuns, nil
+
+                       // time filter or diff sync
+                       if createdAfter != nil {
+                               // if the first record of the page was created 
before minCreated, return emtpy set and stop
+                               firstRun := &models.GithubRun{}
+                               if e := 
json.Unmarshal(body.GithubWorkflowRuns[0], firstRun); e != nil {
+                                       return nil, errors.Default.Wrap(e, 
"failed to unmarshal first run")
+                               }
+                               if 
firstRun.GithubCreatedAt.Before(*createdAfter) {
+                                       return nil, helper.ErrFinishCollect
+                               }
+                               // if the last record was created before 
minCreated, return records and stop
+                               lastRun := &models.GithubRun{}
+                               if e := 
json.Unmarshal(body.GithubWorkflowRuns[len(body.GithubWorkflowRuns)-1], 
lastRun); e != nil {
+                                       return nil, errors.Default.Wrap(e, 
"failed to unmarshal last run")
+                               }
+                               if 
lastRun.GithubCreatedAt.Before(*createdAfter) {
+                                       err = helper.ErrFinishCollect
+                               }
+                       }
+
+                       return body.GithubWorkflowRuns, err
                },
        })
 
@@ -90,10 +121,72 @@ func CollectRuns(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.Execute()
+       err = collectorWithState.Execute()
+       if err != nil {
+               return err
+       }
+
+       // step 2: for incremental collection, we have to update previous 
collected data which status is unfinished
+       if incremental {
+               // update existing data by collecting unfinished runs prior to 
LatestState.LatestSuccessStart
+               return collectUnfinishedRuns(taskCtx)
+       }
+       return nil
 }
 
 type GithubRawRunsResult struct {
        TotalCount         int64             `json:"total_count"`
        GithubWorkflowRuns []json.RawMessage `json:"workflow_runs"`
 }
+
+func collectUnfinishedRuns(taskCtx plugin.SubTaskContext) errors.Error {
+       data := taskCtx.GetData().(*GithubTaskData)
+       db := taskCtx.GetDal()
+
+       // load unfinished runs from the database
+       cursor, err := db.Cursor(
+               dal.Select("id"),
+               dal.From(&models.GithubRun{}),
+               dal.Where(
+                       "repo_id = ? AND connection_id = ? AND status IN 
('ACTION_REQUIRED', 'STALE', 'IN_PROGRESS', 'QUEUED', 'REQUESTED', 'WAITING', 
'PENDING')",
+                       data.Options.GithubId, data.Options.ConnectionId,
+               ),
+       )
+       if err != nil {
+               return err
+       }
+       iterator, err := helper.NewDalCursorIterator(db, cursor, 
reflect.TypeOf(SimpleGithubRun{}))
+       if err != nil {
+               return err
+       }
+
+       // collect details from api
+       collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
+               RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+                       Ctx: taskCtx,
+                       Params: GithubApiParams{
+                               ConnectionId: data.Options.ConnectionId,
+                               Name:         data.Options.Name,
+                       },
+                       Table: RAW_RUN_TABLE,
+               },
+               ApiClient:   data.ApiClient,
+               Input:       iterator,
+               Incremental: true,
+               UrlTemplate: "repos/{{ .Params.Name }}/actions/runs/{{ 
.Input.ID }}",
+               ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
+                       body, err := io.ReadAll(res.Body)
+                       if err != nil {
+                               return nil, errors.Convert(err)
+                       }
+                       res.Body.Close()
+                       return []json.RawMessage{body}, nil
+               },
+               AfterResponse: ignoreHTTPStatus404,
+       })
+
+       if err != nil {
+               return err
+       }
+       return collector.Execute()
+}

Reply via email to