This is an automated email from the ASF dual-hosted git repository.
likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 4abfcb234 feat: github jobs collector supports diffsync (#4456)
4abfcb234 is described below
commit 4abfcb234b72838c9b9f11b6dd013503d0ed1a67
Author: Klesh Wong <[email protected]>
AuthorDate: Tue Feb 21 21:31:56 2023 +0800
feat: github jobs collector supports diffsync (#4456)
---
backend/plugins/github/tasks/cicd_job_collector.go | 51 +++++++++++++++++-----
1 file changed, 39 insertions(+), 12 deletions(-)
diff --git a/backend/plugins/github/tasks/cicd_job_collector.go
b/backend/plugins/github/tasks/cicd_job_collector.go
index bbb080d6f..5db40eebc 100644
--- a/backend/plugins/github/tasks/cicd_job_collector.go
+++ b/backend/plugins/github/tasks/cicd_job_collector.go
@@ -20,14 +20,15 @@ package tasks
import (
"encoding/json"
"fmt"
+ "net/http"
+ "net/url"
+ "reflect"
+
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/github/models"
- "net/http"
- "net/url"
- "reflect"
)
const RAW_JOB_TABLE = "github_api_jobs"
@@ -43,11 +44,38 @@ var CollectJobsMeta = plugin.SubTaskMeta{
func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*GithubTaskData)
- cursor, err := db.Cursor(
+
+ // state manager
+ collectorWithState, err :=
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ Params: GithubApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ Name: data.Options.Name,
+ },
+ Table: RAW_JOB_TABLE,
+ }, data.TimeAfter)
+ if err != nil {
+ return err
+ }
+
+ // load workflow_runs that need jobs collection
+ clauses := []dal.Clause{
dal.Select("id"),
- dal.From(models.GithubRun{}.TableName()),
- dal.Where("repo_id = ? and connection_id = ?",
data.Options.GithubId, data.Options.ConnectionId),
- )
+ dal.From(&models.GithubRun{}),
+ dal.Where(
+ "repo_id = ? AND connection_id = ?",
+ data.Options.GithubId, data.Options.ConnectionId,
+ ),
+ }
+ // incremental collection
+ incremental := collectorWithState.IsIncremental()
+ if incremental {
+ clauses = append(
+ clauses,
+ dal.Where("github_updated_at > ?",
collectorWithState.LatestState.LatestSuccessStart),
+ )
+ }
+ cursor, err := db.Cursor(clauses...)
if err != nil {
return err
}
@@ -55,8 +83,8 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
if err != nil {
return err
}
-
- collector, err := api.NewApiCollector(api.ApiCollectorArgs{
+ // collect jobs
+ err = collectorWithState.InitCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: api.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: GithubApiParams{
@@ -68,7 +96,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
ApiClient: data.ApiClient,
PageSize: 100,
Input: iterator,
- Incremental: false,
+ Incremental: incremental,
UrlTemplate: "repos/{{ .Params.Name }}/actions/runs/{{
.Input.ID }}/jobs",
Query: func(reqData *api.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
@@ -87,11 +115,10 @@ func CollectJobs(taskCtx plugin.SubTaskContext)
errors.Error {
},
AfterResponse: ignoreHTTPStatus404,
})
-
if err != nil {
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
type SimpleGithubRun struct {