This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 61005cd1e fix: move extract existing graphql data to general position 
(#4921)
61005cd1e is described below

commit 61005cd1e4cf3a5c600bf0e99e3805382c32dc57
Author: Likyh <[email protected]>
AuthorDate: Thu Apr 13 21:17:15 2023 +0800

    fix: move extract existing graphql data to general position (#4921)
    
    Co-authored-by: abeizn <[email protected]>
---
 backend/helpers/pluginhelper/api/api_collector.go  |  2 +-
 .../helpers/pluginhelper/api/graphql_collector.go  | 22 ++++++++++++----------
 .../github_graphql/tasks/account_collector.go      |  5 ++++-
 .../github_graphql/tasks/issue_collector.go        |  5 ++++-
 .../plugins/github_graphql/tasks/job_collector.go  |  5 ++++-
 .../plugins/github_graphql/tasks/pr_collector.go   |  6 ++++--
 6 files changed, 29 insertions(+), 16 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/api_collector.go 
b/backend/helpers/pluginhelper/api/api_collector.go
index 27b6b8c5c..845658bef 100644
--- a/backend/helpers/pluginhelper/api/api_collector.go
+++ b/backend/helpers/pluginhelper/api/api_collector.go
@@ -425,7 +425,7 @@ func (collector *ApiCollector) fetchAsync(reqData 
*RequestData, handler func(int
                items, err := collector.args.ResponseParser(res)
                if err != nil {
                        if errors.Is(err, ErrFinishCollect) {
-                               logger.Info("a fetch stop by parser, reqInput: 
#%d", reqData.Params)
+                               logger.Info("a fetch stop by parser, reqInput: 
#%s", reqData.Params)
                                handler = nil
                        } else {
                                return errors.Default.Wrap(err, 
fmt.Sprintf("error parsing response from %s", apiUrl))
diff --git a/backend/helpers/pluginhelper/api/graphql_collector.go 
b/backend/helpers/pluginhelper/api/graphql_collector.go
index de364cfd6..67e3eca80 100644
--- a/backend/helpers/pluginhelper/api/graphql_collector.go
+++ b/backend/helpers/pluginhelper/api/graphql_collector.go
@@ -134,16 +134,23 @@ func (collector *GraphqlCollector) Execute() errors.Error 
{
        if err != nil {
                return errors.Default.Wrap(err, "error running auto-migrate")
        }
+
+       divider := NewBatchSaveDivider(collector.args.Ctx, 
collector.args.BatchSize, collector.table, collector.params)
+
        // flush data if not incremental collection
-       if !collector.args.Incremental {
+       if collector.args.Incremental {
+               // re extract data for new transformation rules
+               err = collector.ExtractExistRawData(divider)
+               if err != nil {
+                       collector.checkError(err)
+               }
+       } else {
                err = db.Delete(&RawData{}, dal.From(collector.table), 
dal.Where("params = ?", collector.params))
                if err != nil {
                        return errors.Default.Wrap(err, "error deleting data 
from collector")
                }
        }
 
-       divider := NewBatchSaveDivider(collector.args.Ctx, 
collector.args.BatchSize, collector.table, collector.params)
-
        collector.args.Ctx.SetProgress(0, -1)
        if collector.args.Input != nil {
                iterator := collector.args.Input
@@ -208,11 +215,6 @@ func (collector *GraphqlCollector) exec(divider 
*BatchSaveDivider, input interfa
                SkipCursor: nil,
                Size:       collector.args.PageSize,
        }
-       err = collector.ExtractExistRawData(divider, reqData)
-       if err != nil {
-               collector.checkError(err)
-               return
-       }
        if collector.args.GetPageInfo != nil {
                collector.fetchOneByOne(divider, reqData)
        } else {
@@ -280,7 +282,7 @@ func (collector *GraphqlCollector) 
BatchSaveWithOrigin(divider *BatchSaveDivider
 }
 
 // ExtractExistRawData will extract data from existing data from raw layer if 
increment
-func (collector *GraphqlCollector) ExtractExistRawData(divider 
*BatchSaveDivider, reqData *GraphqlRequestData) errors.Error {
+func (collector *GraphqlCollector) ExtractExistRawData(divider 
*BatchSaveDivider) errors.Error {
        // load data from database
        db := collector.args.Ctx.GetDal()
        logger := collector.args.Ctx.GetLogger()
@@ -304,7 +306,7 @@ func (collector *GraphqlCollector) 
ExtractExistRawData(divider *BatchSaveDivider
        row := &RawData{}
 
        // get the type of query and variables
-       query, variables, _ := collector.args.BuildQuery(reqData)
+       query, variables, _ := collector.args.BuildQuery(nil)
 
        // prgress
        collector.args.Ctx.SetProgress(0, -1)
diff --git a/backend/plugins/github_graphql/tasks/account_collector.go 
b/backend/plugins/github_graphql/tasks/account_collector.go
index c7b2924a6..73d627239 100644
--- a/backend/plugins/github_graphql/tasks/account_collector.go
+++ b/backend/plugins/github_graphql/tasks/account_collector.go
@@ -101,8 +101,11 @@ func CollectAccount(taskCtx plugin.SubTaskContext) 
errors.Error {
                InputStep:     100,
                GraphqlClient: data.GraphqlClient,
                BuildQuery: func(reqData *helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
-                       accounts := reqData.Input.([]interface{})
                        query := &GraphqlQueryAccountWrapper{}
+                       if reqData == nil {
+                               return query, map[string]interface{}{}, nil
+                       }
+                       accounts := reqData.Input.([]interface{})
                        users := []map[string]interface{}{}
                        for _, iAccount := range accounts {
                                account := iAccount.(*SimpleAccount)
diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go 
b/backend/plugins/github_graphql/tasks/issue_collector.go
index 53aafe66d..87928c421 100644
--- a/backend/plugins/github_graphql/tasks/issue_collector.go
+++ b/backend/plugins/github_graphql/tasks/issue_collector.go
@@ -116,13 +116,16 @@ func CollectIssue(taskCtx plugin.SubTaskContext) 
errors.Error {
                PageSize:      100,
                Incremental:   incremental,
                BuildQuery: func(reqData *helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
+                       query := &GraphqlQueryIssueWrapper{}
+                       if reqData == nil {
+                               return query, map[string]interface{}{}, nil
+                       }
                        since := helper.DateTime{}
                        if incremental {
                                since = helper.DateTime{Time: 
*collectorWithState.LatestState.LatestSuccessStart}
                        } else if collectorWithState.TimeAfter != nil {
                                since = helper.DateTime{Time: 
*collectorWithState.TimeAfter}
                        }
-                       query := &GraphqlQueryIssueWrapper{}
                        ownerName := strings.Split(data.Options.Name, "/")
                        variables := map[string]interface{}{
                                "since":      since,
diff --git a/backend/plugins/github_graphql/tasks/job_collector.go 
b/backend/plugins/github_graphql/tasks/job_collector.go
index 7c992f4d6..27ec6274b 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -144,8 +144,11 @@ func CollectGraphqlJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
                Incremental:   incremental,
                GraphqlClient: data.GraphqlClient,
                BuildQuery: func(reqData *helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
-                       workflowRuns := reqData.Input.([]interface{})
                        query := &GraphqlQueryCheckRunWrapper{}
+                       if reqData == nil {
+                               return query, map[string]interface{}{}, nil
+                       }
+                       workflowRuns := reqData.Input.([]interface{})
                        checkSuiteIds := []map[string]interface{}{}
                        for _, iWorkflowRuns := range workflowRuns {
                                workflowRun := 
iWorkflowRuns.(*SimpleWorkflowRun)
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go 
b/backend/plugins/github_graphql/tasks/pr_collector.go
index 510bde4e9..20b3fa9e1 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -171,6 +171,9 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
                */
                BuildQuery: func(reqData *api.GraphqlRequestData) (interface{}, 
map[string]interface{}, error) {
                        query := &GraphqlQueryPrWrapper{}
+                       if reqData == nil {
+                               return query, map[string]interface{}{}, nil
+                       }
                        ownerName := strings.Split(data.Options.Name, "/")
                        variables := map[string]interface{}{
                                "pageSize":   graphql.Int(reqData.Pager.Size),
@@ -191,12 +194,11 @@ func CollectPr(taskCtx plugin.SubTaskContext) 
errors.Error {
                        results := make([]interface{}, 0, 1)
                        isFinish := false
                        for _, rawL := range prs {
-                               // collect all data even though in increment 
mode because of existing data extracting
+                               // collect data even though in increment mode 
because of updating existing data
                                if collectorWithState.TimeAfter != nil && 
!collectorWithState.TimeAfter.Before(rawL.UpdatedAt) {
                                        isFinish = true
                                        break
                                }
-                               //If this is a pr, ignore
                                githubPr, err := convertGithubPullRequest(rawL, 
data.Options.ConnectionId, data.Options.GithubId)
                                if err != nil {
                                        return nil, err

Reply via email to