This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch kw-7852-components-field-length
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git

commit 5a6c3f933c828fb0f0d2172802e9a396795212ab
Author: Klesh Wong <[email protected]>
AuthorDate: Mon Aug 5 22:08:03 2024 +0800

    refactor: remove extraction logic from github graphql collector
---
 backend/helpers/pluginhelper/api/batch_save.go     |   2 +-
 .../helpers/pluginhelper/api/graphql_collector.go  | 205 +++++----------------
 .../github_graphql/tasks/account_collector.go      |  12 +-
 .../github_graphql/tasks/deployment_collector.go   |  10 +-
 .../github_graphql/tasks/issue_collector.go        |  10 +-
 .../plugins/github_graphql/tasks/job_collector.go  |  20 +-
 .../plugins/github_graphql/tasks/pr_collector.go   |  13 +-
 .../plugins/github_graphql/tasks/pr_extractor.go   | 137 +++++++-------
 .../github_graphql/tasks/release_collector.go      |  17 +-
 9 files changed, 164 insertions(+), 262 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/batch_save.go 
b/backend/helpers/pluginhelper/api/batch_save.go
index 623082536..42a71c18d 100644
--- a/backend/helpers/pluginhelper/api/batch_save.go
+++ b/backend/helpers/pluginhelper/api/batch_save.go
@@ -50,7 +50,7 @@ type BatchSave struct {
 // NewBatchSave creates a new BatchSave instance
 func NewBatchSave(basicRes context.BasicRes, slotType reflect.Type, size int, 
tableName ...string) (*BatchSave, errors.Error) {
        if slotType.Kind() != reflect.Ptr {
-               return nil, errors.Default.New("slotType must be a pointer")
+               panic(errors.Default.New("slotType must be a pointer"))
        }
        db := basicRes.GetDal()
        primaryKey := db.GetPrimaryKeyFields(slotType)
diff --git a/backend/helpers/pluginhelper/api/graphql_collector.go 
b/backend/helpers/pluginhelper/api/graphql_collector.go
index d0adbbabb..f1d74a7cb 100644
--- a/backend/helpers/pluginhelper/api/graphql_collector.go
+++ b/backend/helpers/pluginhelper/api/graphql_collector.go
@@ -26,7 +26,6 @@ import (
 
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
-       "github.com/apache/incubator-devlake/core/models/common"
        plugin "github.com/apache/incubator-devlake/core/plugin"
        "github.com/merico-dev/graphql"
 )
@@ -79,8 +78,8 @@ type GraphqlCollectorArgs struct {
        GetPageInfo func(query interface{}, args *GraphqlCollectorArgs) 
(*GraphqlQueryPageInfo, error)
        BatchSize   int
        // one of ResponseParser and ResponseParserEvenWhenDataErrors is 
required to parse response
-       ResponseParser               func(query interface{}, variables 
map[string]interface{}) ([]interface{}, error)
-       ResponseParserWithDataErrors func(query interface{}, variables 
map[string]interface{}, dataErrors []graphql.DataError) ([]interface{}, error)
+       ResponseParser    func(queryWrapper interface{}) ([]json.RawMessage, 
errors.Error)
+       IgnoreQueryErrors bool
 }
 
 // GraphqlCollector help you collect data from Graphql services
@@ -88,6 +87,7 @@ type GraphqlCollector struct {
        *RawDataSubTask
        args         *GraphqlCollectorArgs
        workerErrors []error
+       batchSave    *BatchSave
 }
 
 // ErrFinishCollect is an error which will finish this collector
@@ -105,19 +105,25 @@ func NewGraphqlCollector(args GraphqlCollectorArgs) 
(*GraphqlCollector, errors.E
        if args.GraphqlClient == nil {
                return nil, errors.Default.New("ApiClient is required")
        }
-       if args.ResponseParser == nil && args.ResponseParserWithDataErrors == 
nil {
+       if args.ResponseParser == nil {
                return nil, errors.Default.New("one of ResponseParser and 
ResponseParserWithDataErrors is required")
        }
-       apiCollector := &GraphqlCollector{
-               RawDataSubTask: rawDataSubTask,
-               args:           &args,
-       }
        if args.BatchSize == 0 {
-               args.BatchSize = 500
+               args.BatchSize = 100
        }
        if args.InputStep == 0 {
                args.InputStep = 1
        }
+       apiCollector := &GraphqlCollector{
+               RawDataSubTask: rawDataSubTask,
+               args:           &args,
+               batchSave: errors.Must1(NewBatchSave(
+                       args.Ctx,
+                       reflect.TypeOf(&RawData{}),
+                       args.BatchSize,
+                       rawDataSubTask.table,
+               )),
+       }
        return apiCollector, nil
 }
 
@@ -133,21 +139,13 @@ func (collector *GraphqlCollector) Execute() errors.Error 
{
                return errors.Default.Wrap(err, "error running auto-migrate")
        }
 
-       divider := NewBatchSaveDivider(collector.args.Ctx, 
collector.args.BatchSize, collector.table, collector.params)
-
        isIncremental := collector.args.Incremental
        syncPolicy := collector.args.Ctx.TaskContext().SyncPolicy()
        if syncPolicy != nil && syncPolicy.FullSync {
                isIncremental = false
        }
        // flush data if not incremental collection
-       if isIncremental {
-               // re extract data for new scope config
-               err = collector.ExtractExistRawData(divider)
-               if err != nil {
-                       collector.checkError(err)
-               }
-       } else {
+       if !isIncremental {
                err = db.Delete(&RawData{}, dal.From(collector.table), 
dal.Where("params = ?", collector.params))
                if err != nil {
                        return errors.Default.Wrap(err, "error deleting data 
from collector")
@@ -166,7 +164,7 @@ func (collector *GraphqlCollector) Execute() errors.Error {
                                        collector.checkError(err)
                                        break
                                }
-                               collector.exec(divider, input)
+                               collector.exec(input)
                        }
                } else {
                        for !collector.HasError() {
@@ -182,12 +180,12 @@ func (collector *GraphqlCollector) Execute() errors.Error 
{
                                if inputs == nil {
                                        break
                                }
-                               collector.exec(divider, inputs)
+                               collector.exec(inputs)
                        }
                }
        } else {
                // or we just did it once
-               collector.exec(divider, nil)
+               collector.exec(nil)
        }
 
        logger.Debug("wait for all async api to finished")
@@ -202,11 +200,11 @@ func (collector *GraphqlCollector) Execute() errors.Error 
{
                logger.Info("ended api collection without error")
        }
 
-       err = divider.Close()
+       err = collector.batchSave.Close()
        return err
 }
 
-func (collector *GraphqlCollector) exec(divider *BatchSaveDivider, input 
interface{}) {
+func (collector *GraphqlCollector) exec(input interface{}) {
        inputJson, err := json.Marshal(input)
        if err != nil {
                collector.checkError(errors.Default.Wrap(err, `input can not be 
marshal to json`))
@@ -219,14 +217,14 @@ func (collector *GraphqlCollector) exec(divider 
*BatchSaveDivider, input interfa
                Size:       collector.args.PageSize,
        }
        if collector.args.GetPageInfo != nil {
-               collector.fetchOneByOne(divider, reqData)
+               collector.fetchOneByOne(reqData)
        } else {
-               collector.fetchAsync(divider, reqData, nil)
+               collector.fetchAsync(reqData, nil)
        }
 }
 
 // fetchOneByOne fetches data of all pages for APIs that return paging 
information
-func (collector *GraphqlCollector) fetchOneByOne(divider *BatchSaveDivider, 
reqData *GraphqlRequestData) {
+func (collector *GraphqlCollector) fetchOneByOne(reqData *GraphqlRequestData) {
        // fetch first page
        var fetchNextPage func(query interface{}) errors.Error
        fetchNextPage = func(query interface{}) errors.Error {
@@ -247,119 +245,16 @@ func (collector *GraphqlCollector) fetchOneByOne(divider 
*BatchSaveDivider, reqD
                                        Input:     reqData.Input,
                                        InputJSON: reqData.InputJSON,
                                }
-                               collector.fetchAsync(divider, reqDataTemp, 
fetchNextPage)
+                               collector.fetchAsync(reqDataTemp, fetchNextPage)
                                return nil
                        }, collector.checkError)
                }
                return nil
        }
-       collector.fetchAsync(divider, reqData, fetchNextPage)
-}
-
-// BatchSaveWithOrigin save the results and fill raw data origin for them
-func (collector *GraphqlCollector) BatchSaveWithOrigin(divider 
*BatchSaveDivider, results []interface{}, row *RawData) errors.Error {
-       // batch save divider
-       RAW_DATA_ORIGIN := "RawDataOrigin"
-       for _, result := range results {
-               // get the batch operator for the specific type
-               batch, err := divider.ForType(reflect.TypeOf(result))
-               if err != nil {
-                       return err
-               }
-               // set raw data origin field
-               origin := 
reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
-               if origin.IsValid() && origin.IsZero() {
-                       origin.Set(reflect.ValueOf(common.RawDataOrigin{
-                               RawDataTable:  collector.table,
-                               RawDataId:     row.ID,
-                               RawDataParams: row.Params,
-                       }))
-               }
-               // records get saved into db when slots were max outed
-               err = batch.Add(result)
-               if err != nil {
-                       return errors.Default.Wrap(err, "error adding result to 
batch")
-               }
-       }
-       return nil
-}
-
-// ExtractExistRawData will extract data from existing data from raw layer if 
increment
-func (collector *GraphqlCollector) ExtractExistRawData(divider 
*BatchSaveDivider) errors.Error {
-       // load data from database
-       db := collector.args.Ctx.GetDal()
-       logger := collector.args.Ctx.GetLogger()
-
-       clauses := []dal.Clause{
-               dal.From(collector.table),
-               dal.Where("params = ?", collector.params),
-               dal.Orderby("id ASC"),
-       }
-
-       count, err := db.Count(clauses...)
-       if err != nil {
-               return errors.Default.Wrap(err, "error getting count of 
clauses")
-       }
-       cursor, err := db.Cursor(clauses...)
-       if err != nil {
-               return errors.Default.Wrap(err, "error running DB query")
-       }
-       logger.Info("get data from %s where params=%s and got %d", 
collector.table, collector.params, count)
-       defer cursor.Close()
-
-       // progress
-       collector.args.Ctx.SetProgress(0, -1)
-       ctx := collector.args.Ctx.GetContext()
-       // iterate all rows
-       for cursor.Next() {
-               select {
-               case <-ctx.Done():
-                       return errors.Convert(ctx.Err())
-               default:
-               }
-               // get the type of query and variables. For each iteration, the 
query should be a different object
-               query, variables, _ := collector.args.BuildQuery(nil)
-               row := &RawData{}
-               err = db.Fetch(cursor, row)
-               if err != nil {
-                       return errors.Default.Wrap(err, "error fetching row")
-               }
-
-               err = errors.Convert(json.Unmarshal(row.Data, &query))
-               if err != nil {
-                       return errors.Default.Wrap(err, `graphql collector 
unmarshal query failed`)
-               }
-               err = errors.Convert(json.Unmarshal(row.Input, &variables))
-               if err != nil {
-                       return errors.Default.Wrap(err, `variables in graphql 
query can not unmarshal from json`)
-               }
-
-               var results []interface{}
-               if collector.args.ResponseParserWithDataErrors != nil {
-                       results, err = 
errors.Convert01(collector.args.ResponseParserWithDataErrors(query, variables, 
nil))
-               } else {
-                       results, err = 
errors.Convert01(collector.args.ResponseParser(query, variables))
-               }
-               if err != nil {
-                       if errors.Is(err, ErrFinishCollect) {
-                               logger.Info("existing data parser return 
ErrFinishCollect, but skip. rawId: #%d", row.ID)
-                       } else {
-                               return errors.Default.Wrap(err, "error calling 
plugin Extract implementation")
-                       }
-               }
-               err = collector.BatchSaveWithOrigin(divider, results, row)
-               if err != nil {
-                       return err
-               }
-
-               collector.args.Ctx.IncProgress(1)
-       }
-
-       // save the last batches
-       return divider.Close()
+       collector.fetchAsync(reqData, fetchNextPage)
 }
 
-func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, 
reqData *GraphqlRequestData, handler func(query interface{}) errors.Error) {
+func (collector *GraphqlCollector) fetchAsync(reqData *GraphqlRequestData, 
handler func(query interface{}) errors.Error) {
        if reqData.Pager == nil {
                reqData.Pager = &CursorPager{
                        SkipCursor: nil,
@@ -373,6 +268,7 @@ func (collector *GraphqlCollector) fetchAsync(divider 
*BatchSaveDivider, reqData
        }
 
        logger := collector.args.Ctx.GetLogger()
+       db := collector.args.Ctx.GetDal()
        dataErrors, err := collector.args.GraphqlClient.Query(query, variables)
        if err != nil {
                if err == context.Canceled {
@@ -384,7 +280,7 @@ func (collector *GraphqlCollector) fetchAsync(divider 
*BatchSaveDivider, reqData
                return
        }
        if len(dataErrors) > 0 {
-               if collector.args.ResponseParserWithDataErrors == nil {
+               if !collector.args.IgnoreQueryErrors {
                        for _, dataError := range dataErrors {
                                
collector.checkError(errors.Default.Wrap(dataError, `graphql query got error`))
                        }
@@ -394,50 +290,37 @@ func (collector *GraphqlCollector) fetchAsync(divider 
*BatchSaveDivider, reqData
        }
        defer logger.Debug("fetchAsync >>> done for %v %v", query, variables)
 
-       paramsBytes, err := json.Marshal(query)
-       if err != nil {
-               collector.checkError(errors.Default.Wrap(err, `graphql 
collector marshal query failed`))
-               return
-       }
-       db := collector.args.Ctx.GetDal()
        queryStr, _ := graphql.ConstructQuery(query, variables)
        variablesJson, err := json.Marshal(variables)
        if err != nil {
                collector.checkError(errors.Default.Wrap(err, `variables in 
graphql query can not marshal to json`))
                return
        }
-       row := &RawData{
-               Params: collector.params,
-               Data:   paramsBytes,
-               Url:    queryStr,
-               Input:  variablesJson,
-       }
-       err = db.Create(row, dal.From(collector.table))
-       if err != nil {
-               collector.checkError(errors.Default.Wrap(err, `not created row 
table in graphql collector`))
-               return
-       }
 
-       var results []interface{}
-       if collector.args.ResponseParserWithDataErrors != nil {
-               results, err = 
collector.args.ResponseParserWithDataErrors(query, variables, dataErrors)
-       } else {
-               results, err = collector.args.ResponseParser(query, variables)
+       results, err := collector.args.ResponseParser(query)
+       for _, result := range results {
+               row := &RawData{
+                       Params: collector.params,
+                       Data:   result,
+                       Url:    queryStr,
+                       Input:  variablesJson,
+               }
+               // collector.batchSave.Add(row)
+               err = db.Create(row, dal.From(collector.table))
+               if err != nil {
+                       collector.checkError(errors.Default.Wrap(err, `not 
created row table in graphql collector`))
+                       return
+               }
        }
        if err != nil {
                if errors.Is(err, ErrFinishCollect) {
-                       logger.Info("collector finish by parser, rawId: #%d", 
row.ID)
+                       logger.Info("collector finish by parser")
                        handler = nil
                } else {
                        collector.checkError(errors.Default.Wrap(err, `not 
parsed response in graphql collector`))
                        return
                }
        }
-       err = collector.BatchSaveWithOrigin(divider, results, row)
-       if err != nil {
-               collector.checkError(err)
-               return
-       }
 
        collector.args.Ctx.IncProgress(1)
        if handler != nil {
diff --git a/backend/plugins/github_graphql/tasks/account_collector.go 
b/backend/plugins/github_graphql/tasks/account_collector.go
index ad16068c4..27d71b65f 100644
--- a/backend/plugins/github_graphql/tasks/account_collector.go
+++ b/backend/plugins/github_graphql/tasks/account_collector.go
@@ -18,6 +18,7 @@ limitations under the License.
 package tasks
 
 import (
+       "encoding/json"
        "reflect"
 
        "github.com/apache/incubator-devlake/core/dal"
@@ -118,13 +119,14 @@ func CollectAccount(taskCtx plugin.SubTaskContext) 
errors.Error {
                        }
                        return query, variables, nil
                },
-               ResponseParserWithDataErrors: func(iQuery interface{}, 
variables map[string]interface{}, dataErrors []graphql.DataError) 
([]interface{}, error) {
-                       for _, dataError := range dataErrors {
-                               // log and ignore
-                               taskCtx.GetLogger().Warn(dataError, `query user 
get error but ignore`)
+               ResponseParser: func(queryWrapper any) (messages 
[]json.RawMessage, err errors.Error) {
+                       query := queryWrapper.(*GraphqlQueryAccountWrapper)
+                       for _, rawL := range query.Users {
+                               messages = append(messages, 
errors.Must1(json.Marshal(rawL)))
                        }
-                       return nil, nil
+                       return
                },
+               IgnoreQueryErrors: true,
        })
 
        if err != nil {
diff --git a/backend/plugins/github_graphql/tasks/deployment_collector.go 
b/backend/plugins/github_graphql/tasks/deployment_collector.go
index 99ddc493f..b28c8deca 100644
--- a/backend/plugins/github_graphql/tasks/deployment_collector.go
+++ b/backend/plugins/github_graphql/tasks/deployment_collector.go
@@ -18,6 +18,7 @@ limitations under the License.
 package tasks
 
 import (
+       "encoding/json"
        "strings"
        "time"
 
@@ -129,15 +130,16 @@ func CollectDeployments(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query := iQuery.(*GraphqlQueryDeploymentWrapper)
                        return query.Repository.Deployments.PageInfo, nil
                },
-               ResponseParser: func(iQuery interface{}, variables 
map[string]interface{}) ([]interface{}, error) {
-                       query := iQuery.(*GraphqlQueryDeploymentWrapper)
+               ResponseParser: func(queryWrapper any) (messages 
[]json.RawMessage, err errors.Error) {
+                       query := queryWrapper.(*GraphqlQueryDeploymentWrapper)
                        deployments := query.Repository.Deployments.Deployments
                        for _, rawL := range deployments {
                                if apiCollector.GetSince() != nil && 
!apiCollector.GetSince().Before(rawL.UpdatedAt) {
-                                       return nil, helper.ErrFinishCollect
+                                       return messages, helper.ErrFinishCollect
                                }
+                               messages = append(messages, 
errors.Must1(json.Marshal(rawL)))
                        }
-                       return nil, nil
+                       return
                },
        })
        if err != nil {
diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go 
b/backend/plugins/github_graphql/tasks/issue_collector.go
index dbf1b1e95..f2d27cc5f 100644
--- a/backend/plugins/github_graphql/tasks/issue_collector.go
+++ b/backend/plugins/github_graphql/tasks/issue_collector.go
@@ -18,6 +18,7 @@ limitations under the License.
 package tasks
 
 import (
+       "encoding/json"
        "strings"
        "time"
 
@@ -115,15 +116,16 @@ func CollectIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query := iQuery.(*GraphqlQueryIssueWrapper)
                        return query.Repository.IssueList.PageInfo, nil
                },
-               ResponseParser: func(iQuery interface{}, variables 
map[string]interface{}) ([]interface{}, error) {
-                       query := iQuery.(*GraphqlQueryIssueWrapper)
+               ResponseParser: func(queryWrapper any) (messages 
[]json.RawMessage, err errors.Error) {
+                       query := queryWrapper.(*GraphqlQueryIssueWrapper)
                        issues := query.Repository.IssueList.Issues
                        for _, rawL := range issues {
                                if apiCollector.GetSince() != nil && 
!apiCollector.GetSince().Before(rawL.UpdatedAt) {
-                                       return nil, helper.ErrFinishCollect
+                                       return messages, helper.ErrFinishCollect
                                }
+                               messages = append(messages, 
errors.Must1(json.Marshal(rawL)))
                        }
-                       return nil, nil
+                       return
                },
        })
        if err != nil {
diff --git a/backend/plugins/github_graphql/tasks/job_collector.go 
b/backend/plugins/github_graphql/tasks/job_collector.go
index effac75ff..10694e56a 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -18,6 +18,7 @@ limitations under the License.
 package tasks
 
 import (
+       "encoding/json"
        "reflect"
        "time"
 
@@ -156,13 +157,22 @@ func CollectJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        }
                        return query, variables, nil
                },
-               ResponseParserWithDataErrors: func(iQuery interface{}, 
variables map[string]interface{}, dataErrors []graphql.DataError) 
([]interface{}, error) {
-                       for _, dataError := range dataErrors {
-                               // log and ignore
-                               taskCtx.GetLogger().Warn(dataError, `query 
check run get error but ignore`)
+               ResponseParser: func(queryWrapper any) (messages 
[]json.RawMessage, err errors.Error) {
+                       query := queryWrapper.(*GraphqlQueryCheckRunWrapper)
+                       for _, node := range query.Node {
+                               checkRun := node.CheckSuite.CheckRuns.Nodes[0]
+                               updatedAt := checkRun.StartedAt
+                               if checkRun.CompletedAt != nil {
+                                       updatedAt = checkRun.CompletedAt
+                               }
+                               if apiCollector.GetSince() != nil && 
!apiCollector.GetSince().Before(*updatedAt) {
+                                       return messages, helper.ErrFinishCollect
+                               }
+                               messages = append(messages, 
errors.Must1(json.Marshal(node)))
                        }
-                       return nil, nil
+                       return
                },
+               IgnoreQueryErrors: true,
        })
        if err != nil {
                return err
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go 
b/backend/plugins/github_graphql/tasks/pr_collector.go
index a129d3b31..9e457e5b9 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -18,6 +18,7 @@ limitations under the License.
 package tasks
 
 import (
+       "encoding/json"
        "strings"
        "time"
 
@@ -170,6 +171,7 @@ func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error 
{
                return err
        }
 
+       since := apiCollector.GetSince()
        err = apiCollector.InitGraphQLCollector(api.GraphqlCollectorArgs{
                GraphqlClient: data.GraphqlClient,
                PageSize:      10,
@@ -194,15 +196,16 @@ func CollectPrs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query := iQuery.(*GraphqlQueryPrWrapper)
                        return query.Repository.PullRequests.PageInfo, nil
                },
-               ResponseParser: func(iQuery interface{}, variables 
map[string]interface{}) ([]interface{}, error) {
-                       query := iQuery.(*GraphqlQueryPrWrapper)
+               ResponseParser: func(queryWrapper any) (messages 
[]json.RawMessage, err errors.Error) {
+                       query := queryWrapper.(*GraphqlQueryPrWrapper)
                        prs := query.Repository.PullRequests.Prs
                        for _, rawL := range prs {
-                               if apiCollector.GetSince() != nil && 
!apiCollector.GetSince().Before(rawL.CreatedAt) {
-                                       return nil, api.ErrFinishCollect
+                               if since != nil && since.After(rawL.UpdatedAt) {
+                                       return messages, api.ErrFinishCollect
                                }
+                               messages = append(messages, 
errors.Must1(json.Marshal(rawL)))
                        }
-                       return nil, nil
+                       return
                },
        })
        if err != nil {
diff --git a/backend/plugins/github_graphql/tasks/pr_extractor.go 
b/backend/plugins/github_graphql/tasks/pr_extractor.go
index 0b4885328..59bc72cdb 100644
--- a/backend/plugins/github_graphql/tasks/pr_extractor.go
+++ b/backend/plugins/github_graphql/tasks/pr_extractor.go
@@ -67,88 +67,85 @@ func ExtractPrs(taskCtx plugin.SubTaskContext) errors.Error 
{
                        Table: RAW_PRS_TABLE,
                },
                Extract: func(row *api.RawData) ([]interface{}, errors.Error) {
-                       apiPr := &GraphqlQueryPrWrapper{}
-                       err := errors.Convert(json.Unmarshal(row.Data, apiPr))
+                       rawL := &GraphqlQueryPr{}
+                       err := errors.Convert(json.Unmarshal(row.Data, rawL))
                        if err != nil {
                                return nil, err
                        }
 
-                       prs := apiPr.Repository.PullRequests.Prs
                        results := make([]interface{}, 0, 1)
-                       for _, rawL := range prs {
-                               githubPr, err := convertGithubPullRequest(rawL, 
data.Options.ConnectionId, data.Options.GithubId)
-                               if err != nil {
-                                       return nil, err
+                       githubPr, err := convertGithubPullRequest(rawL, 
data.Options.ConnectionId, data.Options.GithubId)
+                       if err != nil {
+                               return nil, err
+                       }
+                       extractGraphqlPreAccount(&results, rawL.Author, 
data.Options.GithubId, data.Options.ConnectionId)
+                       for _, label := range rawL.Labels.Nodes {
+                               results = append(results, &models.GithubPrLabel{
+                                       ConnectionId: data.Options.ConnectionId,
+                                       PullId:       githubPr.GithubId,
+                                       LabelName:    label.Name,
+                               })
+                               // if pr.Type has not been set and prType is 
set in .env, process the below
+                               if labelTypeRegex != nil && 
labelTypeRegex.MatchString(label.Name) {
+                                       githubPr.Type = label.Name
                                }
-                               extractGraphqlPreAccount(&results, rawL.Author, 
data.Options.GithubId, data.Options.ConnectionId)
-                               for _, label := range rawL.Labels.Nodes {
-                                       results = append(results, 
&models.GithubPrLabel{
-                                               ConnectionId: 
data.Options.ConnectionId,
-                                               PullId:       githubPr.GithubId,
-                                               LabelName:    label.Name,
-                                       })
-                                       // if pr.Type has not been set and 
prType is set in .env, process the below
-                                       if labelTypeRegex != nil && 
labelTypeRegex.MatchString(label.Name) {
-                                               githubPr.Type = label.Name
-                                       }
-                                       // if pr.Component has not been set and 
prComponent is set in .env, process
-                                       if labelComponentRegex != nil && 
labelComponentRegex.MatchString(label.Name) {
-                                               githubPr.Component = label.Name
+                               // if pr.Component has not been set and 
prComponent is set in .env, process
+                               if labelComponentRegex != nil && 
labelComponentRegex.MatchString(label.Name) {
+                                       githubPr.Component = label.Name
 
-                                       }
-                               }
-                               results = append(results, githubPr)
-
-                               for _, apiPullRequestReview := range 
rawL.Reviews.Nodes {
-                                       if apiPullRequestReview.State != 
"PENDING" {
-                                               githubPrReview := 
&models.GithubPrReview{
-                                                       ConnectionId:   
data.Options.ConnectionId,
-                                                       GithubId:       
apiPullRequestReview.DatabaseId,
-                                                       Body:           
apiPullRequestReview.Body,
-                                                       State:          
apiPullRequestReview.State,
-                                                       CommitSha:      
apiPullRequestReview.Commit.Oid,
-                                                       GithubSubmitAt: 
apiPullRequestReview.SubmittedAt,
-
-                                                       PullRequestId: 
githubPr.GithubId,
-                                               }
-
-                                               if apiPullRequestReview.Author 
!= nil {
-                                                       
githubPrReview.AuthorUserId = apiPullRequestReview.Author.Id
-                                                       
githubPrReview.AuthorUsername = apiPullRequestReview.Author.Login
-                                                       
extractGraphqlPreAccount(&results, apiPullRequestReview.Author, 
data.Options.GithubId, data.Options.ConnectionId)
-                                               }
-
-                                               results = append(results, 
githubPrReview)
-                                       }
                                }
-                               for _, apiReviewRequests := range 
rawL.ReviewRequests.Nodes {
-                                       githubReviewRequests := 
&models.GithubReviewer{
-                                               ConnectionId:  
data.Options.ConnectionId,
+                       }
+                       results = append(results, githubPr)
+
+                       for _, apiPullRequestReview := range rawL.Reviews.Nodes 
{
+                               if apiPullRequestReview.State != "PENDING" {
+                                       githubPrReview := 
&models.GithubPrReview{
+                                               ConnectionId:   
data.Options.ConnectionId,
+                                               GithubId:       
apiPullRequestReview.DatabaseId,
+                                               Body:           
apiPullRequestReview.Body,
+                                               State:          
apiPullRequestReview.State,
+                                               CommitSha:      
apiPullRequestReview.Commit.Oid,
+                                               GithubSubmitAt: 
apiPullRequestReview.SubmittedAt,
+
                                                PullRequestId: 
githubPr.GithubId,
-                                               ReviewerId:    
apiReviewRequests.RequestedReviewer.User.Id,
-                                               Username:      
apiReviewRequests.RequestedReviewer.User.Login,
                                        }
-                                       results = append(results, 
githubReviewRequests)
-                               }
 
-                               for _, apiPullRequestCommit := range 
rawL.Commits.Nodes {
-                                       githubCommit, err := 
convertPullRequestCommit(apiPullRequestCommit)
-                                       if err != nil {
-                                               return nil, err
-                                       }
-                                       results = append(results, githubCommit)
-
-                                       githubPullRequestCommit := 
&models.GithubPrCommit{
-                                               ConnectionId:       
data.Options.ConnectionId,
-                                               CommitSha:          
apiPullRequestCommit.Commit.Oid,
-                                               PullRequestId:      
githubPr.GithubId,
-                                               CommitAuthorName:   
githubCommit.AuthorName,
-                                               CommitAuthorEmail:  
githubCommit.AuthorEmail,
-                                               CommitAuthoredDate: 
githubCommit.AuthoredDate,
+                                       if apiPullRequestReview.Author != nil {
+                                               githubPrReview.AuthorUserId = 
apiPullRequestReview.Author.Id
+                                               githubPrReview.AuthorUsername = 
apiPullRequestReview.Author.Login
+                                               
extractGraphqlPreAccount(&results, apiPullRequestReview.Author, 
data.Options.GithubId, data.Options.ConnectionId)
                                        }
-                                       results = append(results, 
githubPullRequestCommit)
-                                       extractGraphqlPreAccount(&results, 
apiPullRequestCommit.Commit.Author.User, data.Options.GithubId, 
data.Options.ConnectionId)
+
+                                       results = append(results, 
githubPrReview)
+                               }
+                       }
+                       for _, apiReviewRequests := range 
rawL.ReviewRequests.Nodes {
+                               githubReviewRequests := &models.GithubReviewer{
+                                       ConnectionId:  
data.Options.ConnectionId,
+                                       PullRequestId: githubPr.GithubId,
+                                       ReviewerId:    
apiReviewRequests.RequestedReviewer.User.Id,
+                                       Username:      
apiReviewRequests.RequestedReviewer.User.Login,
+                               }
+                               results = append(results, githubReviewRequests)
+                       }
+
+                       for _, apiPullRequestCommit := range rawL.Commits.Nodes 
{
+                               githubCommit, err := 
convertPullRequestCommit(apiPullRequestCommit)
+                               if err != nil {
+                                       return nil, err
+                               }
+                               results = append(results, githubCommit)
+
+                               githubPullRequestCommit := 
&models.GithubPrCommit{
+                                       ConnectionId:       
data.Options.ConnectionId,
+                                       CommitSha:          
apiPullRequestCommit.Commit.Oid,
+                                       PullRequestId:      githubPr.GithubId,
+                                       CommitAuthorName:   
githubCommit.AuthorName,
+                                       CommitAuthorEmail:  
githubCommit.AuthorEmail,
+                                       CommitAuthoredDate: 
githubCommit.AuthoredDate,
                                }
+                               results = append(results, 
githubPullRequestCommit)
+                               extractGraphqlPreAccount(&results, 
apiPullRequestCommit.Commit.Author.User, data.Options.GithubId, 
data.Options.ConnectionId)
                        }
                        return results, nil
                },
@@ -161,7 +158,7 @@ func ExtractPrs(taskCtx plugin.SubTaskContext) errors.Error 
{
        return extractor.Execute()
 }
 
-func convertGithubPullRequest(pull GraphqlQueryPr, connId uint64, repoId int) 
(*models.GithubPullRequest, errors.Error) {
+func convertGithubPullRequest(pull *GraphqlQueryPr, connId uint64, repoId int) 
(*models.GithubPullRequest, errors.Error) {
        githubPull := &models.GithubPullRequest{
                ConnectionId:    connId,
                GithubId:        pull.DatabaseId,
diff --git a/backend/plugins/github_graphql/tasks/release_collector.go 
b/backend/plugins/github_graphql/tasks/release_collector.go
index 4b71626d0..0090bd60c 100644
--- a/backend/plugins/github_graphql/tasks/release_collector.go
+++ b/backend/plugins/github_graphql/tasks/release_collector.go
@@ -18,6 +18,7 @@ limitations under the License.
 package tasks
 
 import (
+       "encoding/json"
        "strings"
        "time"
 
@@ -97,6 +98,7 @@ func CollectRelease(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
+       since := apiCollector.GetSince()
        err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{
                GraphqlClient: data.GraphqlClient,
                PageSize:      100,
@@ -119,15 +121,16 @@ func CollectRelease(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query := iQuery.(*GraphqlQueryReleaseWrapper)
                        return query.Repository.Releases.PageInfo, nil
                },
-               ResponseParser: func(iQuery interface{}, variables 
map[string]interface{}) ([]interface{}, error) {
-                       query := iQuery.(*GraphqlQueryReleaseWrapper)
-                       deployments := query.Repository.Releases.Releases
-                       for _, rawL := range deployments {
-                               if apiCollector.GetSince() != nil && 
!apiCollector.GetSince().Before(rawL.UpdatedAt) {
-                                       return nil, helper.ErrFinishCollect
+               ResponseParser: func(queryWrapper any) (messages 
[]json.RawMessage, err errors.Error) {
+                       query := queryWrapper.(*GraphqlQueryReleaseWrapper)
+                       releases := query.Repository.Releases.Releases
+                       for _, rawL := range releases {
+                               if since != nil && since.After(rawL.UpdatedAt) {
+                                       return messages, helper.ErrFinishCollect
                                }
+                               messages = append(messages, 
errors.Must1(json.Marshal(rawL)))
                        }
-                       return nil, nil
+                       return
                },
        })
        if err != nil {

Reply via email to