This is an automated email from the ASF dual-hosted git repository.
likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new a6ed2ee25 feat: github runs collector supports timeFilter/diffSync
(#4442)
a6ed2ee25 is described below
commit a6ed2ee25b1e579f2a004eafa1276053ff2118d8
Author: Klesh Wong <[email protected]>
AuthorDate: Mon Feb 20 15:47:25 2023 +0800
feat: github runs collector supports timeFilter/diffSync (#4442)
---
backend/plugins/github/tasks/cicd_run_collector.go | 131 ++++++++++++++++++---
1 file changed, 112 insertions(+), 19 deletions(-)
diff --git a/backend/plugins/github/tasks/cicd_run_collector.go
b/backend/plugins/github/tasks/cicd_run_collector.go
index 5990f5231..19d6b9d02 100644
--- a/backend/plugins/github/tasks/cicd_run_collector.go
+++ b/backend/plugins/github/tasks/cicd_run_collector.go
@@ -20,16 +20,26 @@ package tasks
import (
"encoding/json"
"fmt"
+ "io"
"net/http"
"net/url"
+ "reflect"
+ "time"
+ "github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+ "github.com/apache/incubator-devlake/plugins/github/models"
)
const RAW_RUN_TABLE = "github_api_runs"
+// Although the API accepts a maximum of 100 entries per page, sometimes
+// the response body is too large which would lead to request failures
+// https://github.com/apache/incubator-devlake/issues/3199
+const PAGE_SIZE = 30
+
var CollectRunsMeta = plugin.SubTaskMeta{
Name: "collectRuns",
EntryPoint: CollectRuns,
@@ -40,7 +50,7 @@ var CollectRunsMeta = plugin.SubTaskMeta{
func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error {
data := taskCtx.GetData().(*GithubTaskData)
- collectorWithState, err :=
helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+ collectorWithState, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: GithubApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -52,37 +62,58 @@ func CollectRuns(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- //incremental := collectorWithState.IsIncremental()
+ incremental := collectorWithState.IsIncremental()
+
+ // step 1: fetch records created after createdAfter
+ var createdAfter *time.Time
+ if incremental {
+ createdAfter = collectorWithState.LatestState.LatestSuccessStart
+ } else {
+ createdAfter = data.TimeAfter
+ }
+
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: 30,
- //Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: PAGE_SIZE,
+ Incremental: incremental,
UrlTemplate: "repos/{{ .Params.Name }}/actions/runs",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- // if data.CreatedDateAfter != nil, we set since once
- // There is a bug for github rest api, so temporarily
commented the following code
- //if data.CreatedDateAfter != nil {
- // startDate :=
data.CreatedDateAfter.Format("2006-01-02")
- // query.Set("created", fmt.Sprintf("%s..*",
startDate))
- //}
- //// if incremental == true, we overwrite it
- //if incremental {
- // startDate :=
collectorWithState.LatestState.LatestSuccessStart.Format("2006-01-02")
- // query.Set("created", fmt.Sprintf("%s..*",
startDate))
- //}
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("per_page", fmt.Sprintf("%v",
reqData.Pager.Size))
return query, nil
},
- GetTotalPages: GetTotalPagesFromResponse,
+ // use Undetermined strategy so we can stop fetching further
pages by using
+ // ErrFinishCollect
+ Concurrency: 10,
ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
body := &GithubRawRunsResult{}
err := helper.UnmarshalResponse(res, body)
if err != nil {
return nil, err
}
- return body.GithubWorkflowRuns, nil
+
+ // time filter or diff sync
+ if createdAfter != nil {
+ // if the first record of the page was created
before minCreated, return emtpy set and stop
+ firstRun := &models.GithubRun{}
+ if e :=
json.Unmarshal(body.GithubWorkflowRuns[0], firstRun); e != nil {
+ return nil, errors.Default.Wrap(e,
"failed to unmarshal first run")
+ }
+ if
firstRun.GithubCreatedAt.Before(*createdAfter) {
+ return nil, helper.ErrFinishCollect
+ }
+ // if the last record was created before
minCreated, return records and stop
+ lastRun := &models.GithubRun{}
+ if e :=
json.Unmarshal(body.GithubWorkflowRuns[len(body.GithubWorkflowRuns)-1],
lastRun); e != nil {
+ return nil, errors.Default.Wrap(e,
"failed to unmarshal last run")
+ }
+ if
lastRun.GithubCreatedAt.Before(*createdAfter) {
+ err = helper.ErrFinishCollect
+ }
+ }
+
+ return body.GithubWorkflowRuns, err
},
})
@@ -90,10 +121,72 @@ func CollectRuns(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ err = collectorWithState.Execute()
+ if err != nil {
+ return err
+ }
+
+ // step 2: for incremental collection, we have to update previous
collected data which status is unfinished
+ if incremental {
+ // update existing data by collecting unfinished runs prior to
LatestState.LatestSuccessStart
+ return collectUnfinishedRuns(taskCtx)
+ }
+ return nil
}
type GithubRawRunsResult struct {
TotalCount int64 `json:"total_count"`
GithubWorkflowRuns []json.RawMessage `json:"workflow_runs"`
}
+
+func collectUnfinishedRuns(taskCtx plugin.SubTaskContext) errors.Error {
+ data := taskCtx.GetData().(*GithubTaskData)
+ db := taskCtx.GetDal()
+
+ // load unfinished runs from the database
+ cursor, err := db.Cursor(
+ dal.Select("id"),
+ dal.From(&models.GithubRun{}),
+ dal.Where(
+ "repo_id = ? AND connection_id = ? AND status IN
('ACTION_REQUIRED', 'STALE', 'IN_PROGRESS', 'QUEUED', 'REQUESTED', 'WAITING',
'PENDING')",
+ data.Options.GithubId, data.Options.ConnectionId,
+ ),
+ )
+ if err != nil {
+ return err
+ }
+ iterator, err := helper.NewDalCursorIterator(db, cursor,
reflect.TypeOf(SimpleGithubRun{}))
+ if err != nil {
+ return err
+ }
+
+ // collect details from api
+ collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
+ RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ Params: GithubApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ Name: data.Options.Name,
+ },
+ Table: RAW_RUN_TABLE,
+ },
+ ApiClient: data.ApiClient,
+ Input: iterator,
+ Incremental: true,
+ UrlTemplate: "repos/{{ .Params.Name }}/actions/runs/{{
.Input.ID }}",
+ ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
+ body, err := io.ReadAll(res.Body)
+ if err != nil {
+ return nil, errors.Convert(err)
+ }
+ res.Body.Close()
+ return []json.RawMessage{body}, nil
+ },
+ AfterResponse: ignoreHTTPStatus404,
+ })
+
+ if err != nil {
+ return err
+ }
+ return collector.Execute()
+}