This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new dc50e92ba feat: add increment support for graphql collector (#4491)
dc50e92ba is described below

commit dc50e92bac0a4ec84ec50df3f64283f30a0700f9
Author: Likyh <[email protected]>
AuthorDate: Thu Feb 23 17:54:07 2023 +0800

    feat: add increment support for graphql collector (#4491)
    
    * feat: add increment support for graphql collector
    
    * refactor: change collector name from CollectCheckRun to CollectGraphqlJobs
    
    * fix: fix for review
    
    * fix: use filter at to support increment for issue; add add some comment 
for pr
---
 .../helpers/pluginhelper/api/graphql_collector.go  | 153 ++++++++++++++++-----
 backend/plugins/github_graphql/impl/impl.go        |   2 +-
 .../github_graphql/tasks/issue_collector.go        |  39 ++++--
 .../{check_run_collector.go => job_collector.go}   |  26 ++--
 .../plugins/github_graphql/tasks/pr_collector.go   |  43 +++---
 5 files changed, 186 insertions(+), 77 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/graphql_collector.go 
b/backend/helpers/pluginhelper/api/graphql_collector.go
index e1753c90f..de364cfd6 100644
--- a/backend/helpers/pluginhelper/api/graphql_collector.go
+++ b/backend/helpers/pluginhelper/api/graphql_collector.go
@@ -27,6 +27,7 @@ import (
        "github.com/merico-dev/graphql"
        "net/http"
        "reflect"
+       "time"
 )
 
 // CursorPager contains pagination information for a graphql request
@@ -49,6 +50,10 @@ type GraphqlQueryPageInfo struct {
        HasNextPage bool   `json:"hasNextPage"`
 }
 
+// DateTime is the type of time in Graphql
+// graphql lib can only read this name...
+type DateTime struct{ time.Time }
+
 // GraphqlAsyncResponseHandler callback function to handle the Response 
asynchronously
 type GraphqlAsyncResponseHandler func(res *http.Response) error
 
@@ -203,6 +208,11 @@ func (collector *GraphqlCollector) exec(divider 
*BatchSaveDivider, input interfa
                SkipCursor: nil,
                Size:       collector.args.PageSize,
        }
+       err = collector.ExtractExistRawData(divider, reqData)
+       if err != nil {
+               collector.checkError(err)
+               return
+       }
        if collector.args.GetPageInfo != nil {
                collector.fetchOneByOne(divider, reqData)
        } else {
@@ -210,7 +220,7 @@ func (collector *GraphqlCollector) exec(divider 
*BatchSaveDivider, input interfa
        }
 }
 
-// fetchPagesDetermined fetches data of all pages for APIs that return paging 
information
+// fetchOneByOne fetches data of all pages for APIs that return paging 
information
 func (collector *GraphqlCollector) fetchOneByOne(divider *BatchSaveDivider, 
reqData *GraphqlRequestData) {
        // fetch first page
        var fetchNextPage func(query interface{}) errors.Error
@@ -241,6 +251,110 @@ func (collector *GraphqlCollector) fetchOneByOne(divider 
*BatchSaveDivider, reqD
        collector.fetchAsync(divider, reqData, fetchNextPage)
 }
 
+// BatchSaveWithOrigin save the results and fill raw data origin for them
+func (collector *GraphqlCollector) BatchSaveWithOrigin(divider 
*BatchSaveDivider, results []interface{}, row *RawData) errors.Error {
+       // batch save divider
+       RAW_DATA_ORIGIN := "RawDataOrigin"
+       for _, result := range results {
+               // get the batch operator for the specific type
+               batch, err := divider.ForType(reflect.TypeOf(result))
+               if err != nil {
+                       return err
+               }
+               // set raw data origin field
+               origin := 
reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
+               if origin.IsValid() && origin.IsZero() {
+                       origin.Set(reflect.ValueOf(common.RawDataOrigin{
+                               RawDataTable:  collector.table,
+                               RawDataId:     row.ID,
+                               RawDataParams: row.Params,
+                       }))
+               }
+               // records get saved into db when slots were max outed
+               err = batch.Add(result)
+               if err != nil {
+                       return errors.Default.Wrap(err, "error adding result to 
batch")
+               }
+       }
+       return nil
+}
+
+// ExtractExistRawData will extract data from existing data from raw layer if 
increment
+func (collector *GraphqlCollector) ExtractExistRawData(divider 
*BatchSaveDivider, reqData *GraphqlRequestData) errors.Error {
+       // load data from database
+       db := collector.args.Ctx.GetDal()
+       logger := collector.args.Ctx.GetLogger()
+
+       clauses := []dal.Clause{
+               dal.From(collector.table),
+               dal.Where("params = ?", collector.params),
+               dal.Orderby("id ASC"),
+       }
+
+       count, err := db.Count(clauses...)
+       if err != nil {
+               return errors.Default.Wrap(err, "error getting count of 
clauses")
+       }
+       cursor, err := db.Cursor(clauses...)
+       if err != nil {
+               return errors.Default.Wrap(err, "error running DB query")
+       }
+       logger.Info("get data from %s where params=%s and got %d", 
collector.table, collector.params, count)
+       defer cursor.Close()
+       row := &RawData{}
+
+       // get the type of query and variables
+       query, variables, _ := collector.args.BuildQuery(reqData)
+
+       // prgress
+       collector.args.Ctx.SetProgress(0, -1)
+       ctx := collector.args.Ctx.GetContext()
+       // iterate all rows
+       for cursor.Next() {
+               select {
+               case <-ctx.Done():
+                       return errors.Convert(ctx.Err())
+               default:
+               }
+               err = db.Fetch(cursor, row)
+               if err != nil {
+                       return errors.Default.Wrap(err, "error fetching row")
+               }
+
+               err = errors.Convert(json.Unmarshal(row.Data, &query))
+               if err != nil {
+                       return errors.Default.Wrap(err, `graphql collector 
unmarshal query failed`)
+               }
+               err = errors.Convert(json.Unmarshal(row.Input, &variables))
+               if err != nil {
+                       return errors.Default.Wrap(err, `variables in graphql 
query can not unmarshal from json`)
+               }
+
+               var results []interface{}
+               if collector.args.ResponseParserWithDataErrors != nil {
+                       results, err = 
errors.Convert01(collector.args.ResponseParserWithDataErrors(query, variables, 
nil))
+               } else {
+                       results, err = 
errors.Convert01(collector.args.ResponseParser(query, variables))
+               }
+               if err != nil {
+                       if errors.Is(err, ErrFinishCollect) {
+                               logger.Info("existing data parser return 
ErrFinishCollect, but skip. rawId: #%d", row.ID)
+                       } else {
+                               return errors.Default.Wrap(err, "error calling 
plugin Extract implementation")
+                       }
+               }
+               err = collector.BatchSaveWithOrigin(divider, results, row)
+               if err != nil {
+                       return err
+               }
+
+               collector.args.Ctx.IncProgress(1)
+       }
+
+       // save the last batches
+       return divider.Close()
+}
+
 func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, 
reqData *GraphqlRequestData, handler func(query interface{}) errors.Error) {
        if reqData.Pager == nil {
                reqData.Pager = &CursorPager{
@@ -300,10 +414,8 @@ func (collector *GraphqlCollector) fetchAsync(divider 
*BatchSaveDivider, reqData
                return
        }
 
-       var (
-               results []interface{}
-       )
-       if len(dataErrors) > 0 || collector.args.ResponseParser == nil {
+       var results []interface{}
+       if collector.args.ResponseParserWithDataErrors != nil {
                results, err = 
collector.args.ResponseParserWithDataErrors(query, variables, dataErrors)
        } else {
                results, err = collector.args.ResponseParser(query, variables)
@@ -317,33 +429,12 @@ func (collector *GraphqlCollector) fetchAsync(divider 
*BatchSaveDivider, reqData
                        return
                }
        }
-
-       RAW_DATA_ORIGIN := "RawDataOrigin"
-       // batch save divider
-       for _, result := range results {
-               // get the batch operator for the specific type
-               batch, err := divider.ForType(reflect.TypeOf(result))
-               if err != nil {
-                       collector.checkError(err)
-                       return
-               }
-               // set raw data origin field
-               origin := 
reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
-               if origin.IsValid() {
-                       origin.Set(reflect.ValueOf(common.RawDataOrigin{
-                               RawDataTable:  collector.table,
-                               RawDataId:     row.ID,
-                               RawDataParams: row.Params,
-                       }))
-               }
-               // records get saved into db when slots were max outed
-               err = batch.Add(result)
-               if err != nil {
-                       collector.checkError(err)
-                       return
-               }
-               collector.args.Ctx.IncProgress(1)
+       err = collector.BatchSaveWithOrigin(divider, results, row)
+       if err != nil {
+               collector.checkError(err)
+               return
        }
+
        collector.args.Ctx.IncProgress(1)
        if handler != nil {
                // trigger next fetch, but return if ErrFinishCollect got from 
ResponseParser
diff --git a/backend/plugins/github_graphql/impl/impl.go 
b/backend/plugins/github_graphql/impl/impl.go
index abc675989..1602a0514 100644
--- a/backend/plugins/github_graphql/impl/impl.go
+++ b/backend/plugins/github_graphql/impl/impl.go
@@ -83,7 +83,7 @@ func (p GithubGraphql) SubTaskMetas() []plugin.SubTaskMeta {
                // collect workflow run & job
                githubTasks.CollectRunsMeta,
                githubTasks.ExtractRunsMeta,
-               tasks.CollectCheckRunMeta,
+               tasks.CollectGraphqlJobsMeta,
 
                // collect others
                githubTasks.CollectApiCommentsMeta,
diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go 
b/backend/plugins/github_graphql/tasks/issue_collector.go
index 7fac6690e..5cddf35d3 100644
--- a/backend/plugins/github_graphql/tasks/issue_collector.go
+++ b/backend/plugins/github_graphql/tasks/issue_collector.go
@@ -42,7 +42,7 @@ type GraphqlQueryIssueWrapper struct {
                        TotalCount graphql.Int
                        Issues     []GraphqlQueryIssue `graphql:"nodes"`
                        PageInfo   *helper.GraphqlQueryPageInfo
-               } `graphql:"issues(first: $pageSize, after: $skipCursor, 
orderBy: {field: CREATED_AT, direction: DESC})"`
+               } `graphql:"issues(first: $pageSize, after: $skipCursor, 
orderBy: {field: CREATED_AT, direction: DESC}, filterBy: {since: $since})"`
        } `graphql:"repository(owner: $owner, name: $name)"`
 }
 
@@ -97,21 +97,35 @@ func CollectIssue(taskCtx plugin.SubTaskContext) 
errors.Error {
                return nil
        }
 
-       collector, err := 
helper.NewGraphqlCollector(helper.GraphqlCollectorArgs{
-               RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
-                       Ctx: taskCtx,
-                       Params: githubTasks.GithubApiParams{
-                               ConnectionId: data.Options.ConnectionId,
-                               Name:         data.Options.Name,
-                       },
-                       Table: RAW_ISSUES_TABLE,
+       collectorWithState, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+               Ctx: taskCtx,
+               Params: githubTasks.GithubApiParams{
+                       ConnectionId: data.Options.ConnectionId,
+                       Name:         data.Options.Name,
                },
+               Table: RAW_ISSUES_TABLE,
+       }, data.TimeAfter)
+       if err != nil {
+               return err
+       }
+
+       incremental := collectorWithState.IsIncremental()
+
+       err = 
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
                GraphqlClient: data.GraphqlClient,
                PageSize:      100,
+               Incremental:   incremental,
                BuildQuery: func(reqData *helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
+                       since := helper.DateTime{}
+                       if incremental {
+                               since = helper.DateTime{Time: 
*collectorWithState.LatestState.LatestSuccessStart}
+                       } else if collectorWithState.TimeAfter != nil {
+                               since = helper.DateTime{Time: 
*collectorWithState.TimeAfter}
+                       }
                        query := &GraphqlQueryIssueWrapper{}
                        ownerName := strings.Split(data.Options.Name, "/")
                        variables := map[string]interface{}{
+                               "since":      since,
                                "pageSize":   graphql.Int(reqData.Pager.Size),
                                "skipCursor": 
(*graphql.String)(reqData.Pager.SkipCursor),
                                "owner":      graphql.String(ownerName[0]),
@@ -130,10 +144,6 @@ func CollectIssue(taskCtx plugin.SubTaskContext) 
errors.Error {
                        results := make([]interface{}, 0, 1)
                        isFinish := false
                        for _, issue := range issues {
-                               if data.TimeAfter != nil && 
!data.TimeAfter.Before(issue.CreatedAt) {
-                                       isFinish = true
-                                       break
-                               }
                                githubIssue, err := 
convertGithubIssue(milestoneMap, issue, data.Options.ConnectionId, 
data.Options.GithubId)
                                if err != nil {
                                        return nil, err
@@ -166,12 +176,11 @@ func CollectIssue(taskCtx plugin.SubTaskContext) 
errors.Error {
                        }
                },
        })
-
        if err != nil {
                return err
        }
 
-       return collector.Execute()
+       return collectorWithState.Execute()
 }
 
 // create a milestone map for numberId to databaseId
diff --git a/backend/plugins/github_graphql/tasks/check_run_collector.go 
b/backend/plugins/github_graphql/tasks/job_collector.go
similarity index 90%
rename from backend/plugins/github_graphql/tasks/check_run_collector.go
rename to backend/plugins/github_graphql/tasks/job_collector.go
index 65a4b4c12..9374a5595 100644
--- a/backend/plugins/github_graphql/tasks/check_run_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -32,7 +32,7 @@ import (
        "github.com/merico-dev/graphql"
 )
 
-const RAW_CHECK_RUNS_TABLE = "github_graphql_check_runs"
+const RAW_GRAPHQL_JOBS_TABLE = "github_graphql_jobs"
 
 type GraphqlQueryCheckRunWrapper struct {
        RateLimit struct {
@@ -42,12 +42,14 @@ type GraphqlQueryCheckRunWrapper struct {
 }
 
 type GraphqlQueryCheckSuite struct {
-       Id         string
-       Typename   string `graphql:"__typename"`
+       Id       string
+       Typename string `graphql:"__typename"`
+       // equal to Run in rest
        CheckSuite struct {
                WorkflowRun struct {
                        DatabaseId int
                }
+               // equal to Job in rest
                CheckRuns struct {
                        TotalCount int
                        Nodes      []struct {
@@ -86,28 +88,28 @@ type SimpleWorkflowRun struct {
        CheckSuiteNodeID string
 }
 
-var CollectCheckRunMeta = plugin.SubTaskMeta{
-       Name:             "CollectCheckRun",
-       EntryPoint:       CollectCheckRun,
+var CollectGraphqlJobsMeta = plugin.SubTaskMeta{
+       Name:             "CollectGraphqlJobs",
+       EntryPoint:       CollectGraphqlJobs,
        EnabledByDefault: true,
-       Description:      "Collect CheckRun data from GithubGraphql api",
+       Description:      "Collect Jobs(CheckRun) data from GithubGraphql api",
        DomainTypes:      []string{plugin.DOMAIN_TYPE_CICD},
 }
 
 var _ plugin.SubTaskEntryPoint = CollectAccount
 
-func CollectCheckRun(taskCtx plugin.SubTaskContext) errors.Error {
+func CollectGraphqlJobs(taskCtx plugin.SubTaskContext) errors.Error {
        logger := taskCtx.GetLogger()
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*githubTasks.GithubTaskData)
 
-       collectorWithState, err := 
helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+       collectorWithState, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
                Ctx: taskCtx,
                Params: githubTasks.GithubApiParams{
                        ConnectionId: data.Options.ConnectionId,
                        Name:         data.Options.Name,
                },
-               Table: RAW_CHECK_RUNS_TABLE,
+               Table: RAW_GRAPHQL_JOBS_TABLE,
        }, data.TimeAfter)
        if err != nil {
                return err
@@ -121,9 +123,6 @@ func CollectCheckRun(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where("repo_id = ? and connection_id=?", 
data.Options.GithubId, data.Options.ConnectionId),
                dal.Orderby("github_updated_at DESC"),
        }
-       if collectorWithState.TimeAfter != nil {
-               clauses = append(clauses, dal.Where("github_created_at > ?", 
*collectorWithState.TimeAfter))
-       }
        if incremental {
                clauses = append(clauses, dal.Where("github_updated_at > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
        }
@@ -203,7 +202,6 @@ func CollectCheckRun(taskCtx plugin.SubTaskContext) 
errors.Error {
                        return results, nil
                },
        })
-
        if err != nil {
                return err
        }
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go 
b/backend/plugins/github_graphql/tasks/pr_collector.go
index b4b67d81d..a71ea1c2e 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -36,12 +36,16 @@ type GraphqlQueryPrWrapper struct {
        RateLimit struct {
                Cost int
        }
+       // now it orderBy UPDATED_AT and use cursor pagination
+       // It may miss some PRs updated when collection.
+       // Because these missed PRs will be collected on next, But it's not 
enough.
+       // So Next Millstone(0.17) we should change it to filter by CREATE_AT + 
collect detail
        Repository struct {
                PullRequests struct {
                        PageInfo   *api.GraphqlQueryPageInfo
                        Prs        []GraphqlQueryPr `graphql:"nodes"`
                        TotalCount graphql.Int
-               } `graphql:"pullRequests(first: $pageSize, after: $skipCursor, 
orderBy: {field: CREATED_AT, direction: DESC})"`
+               } `graphql:"pullRequests(first: $pageSize, after: $skipCursor, 
orderBy: {field: UPDATED_AT, direction: DESC})"`
        } `graphql:"repository(owner: $owner, name: $name)"`
 }
 
@@ -130,31 +134,38 @@ func CollectPr(taskCtx plugin.SubTaskContext) 
errors.Error {
        config := data.Options.GithubTransformationRule
        var labelTypeRegex *regexp.Regexp
        var labelComponentRegex *regexp.Regexp
-       var err error
+       var err errors.Error
        if config != nil && len(config.PrType) > 0 {
-               labelTypeRegex, err = regexp.Compile(config.PrType)
+               labelTypeRegex, err = 
errors.Convert01(regexp.Compile(config.PrType))
                if err != nil {
                        return errors.Default.Wrap(err, "regexp Compile prType 
failed")
                }
        }
        if config != nil && len(config.PrComponent) > 0 {
-               labelComponentRegex, err = regexp.Compile(config.PrComponent)
+               labelComponentRegex, err = 
errors.Convert01(regexp.Compile(config.PrComponent))
                if err != nil {
                        return errors.Default.Wrap(err, "regexp Compile 
prComponent failed")
                }
        }
 
-       collector, err := api.NewGraphqlCollector(api.GraphqlCollectorArgs{
-               RawDataSubTaskArgs: api.RawDataSubTaskArgs{
-                       Ctx: taskCtx,
-                       Params: tasks.GithubApiParams{
-                               ConnectionId: data.Options.ConnectionId,
-                               Name:         data.Options.Name,
-                       },
-                       Table: RAW_PRS_TABLE,
+       collectorWithState, err := 
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+               Ctx: taskCtx,
+               Params: tasks.GithubApiParams{
+                       ConnectionId: data.Options.ConnectionId,
+                       Name:         data.Options.Name,
                },
+               Table: RAW_PRS_TABLE,
+       }, data.TimeAfter)
+       if err != nil {
+               return err
+       }
+
+       incremental := collectorWithState.IsIncremental()
+
+       err = collectorWithState.InitGraphQLCollector(api.GraphqlCollectorArgs{
                GraphqlClient: data.GraphqlClient,
                PageSize:      30,
+               Incremental:   incremental,
                /*
                        (Optional) Return query string for request, or you can 
plug them into UrlTemplate directly
                */
@@ -180,7 +191,8 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
                        results := make([]interface{}, 0, 1)
                        isFinish := false
                        for _, rawL := range prs {
-                               if data.TimeAfter != nil && 
!data.TimeAfter.Before(rawL.CreatedAt) {
+                               // collect all data even though in increment 
mode because of existing data extracting
+                               if collectorWithState.TimeAfter != nil && 
!collectorWithState.TimeAfter.Before(rawL.UpdatedAt) {
                                        isFinish = true
                                        break
                                }
@@ -291,12 +303,11 @@ func CollectPr(taskCtx plugin.SubTaskContext) 
errors.Error {
                        }
                },
        })
-
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
 
-       return collector.Execute()
+       return collectorWithState.Execute()
 }
 
 func convertGithubPullRequest(pull GraphqlQueryPr, connId uint64, repoId int) 
(*models.GithubPullRequest, errors.Error) {

Reply via email to