This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new c6f074ac Issues/3521 some graphql bug fixes (#3601)
c6f074ac is described below

commit c6f074ac182b59588d4db74d7ed2aa3cd8827f21
Author: Likyh <[email protected]>
AuthorDate: Mon Oct 31 12:30:43 2022 +0800

    Issues/3521 some graphql bug fixes (#3601)
    
    * fix: change the dealing type for error
    
    * feat: add close method to finish rateLimit timer
    
    * feat: only sync updated github user to domain layer
    
    * feat: support 2 plugin in make dev
    
    * fix: replace \0 to '0x00' for github issue body
    
    * feat: add retry logic for graphql
    
    * feat: deal data errors in github graphql users collector
    
    * fix: fix the bug for first user in each page droped
    
    Co-authored-by: linyh <[email protected]>
---
 go.mod                                            |   2 +-
 go.sum                                            |   2 +
 plugins/github/tasks/account_convertor.go         |  14 +-
 plugins/github_graphql/plugin_main.go             |  23 +++-
 plugins/github_graphql/tasks/account_collector.go |  13 +-
 plugins/github_graphql/tasks/issue_collector.go   |   3 +-
 plugins/helper/graphql_async_client.go            | 149 ++++++++++++++--------
 plugins/helper/graphql_collector.go               | 114 +++++++++++------
 scripts/compile-plugins.sh                        |   7 +
 9 files changed, 229 insertions(+), 98 deletions(-)

diff --git a/go.mod b/go.mod
index 0fc69e61..96192cd9 100644
--- a/go.mod
+++ b/go.mod
@@ -15,7 +15,7 @@ require (
        github.com/libgit2/git2go/v33 v33.0.6
        github.com/magiconair/properties v1.8.5
        github.com/manifoldco/promptui v0.9.0
-       github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2
+       github.com/merico-dev/graphql v0.0.0-20221027131946-77460a1fd4cd
        github.com/mitchellh/mapstructure v1.4.1
        github.com/panjf2000/ants/v2 v2.4.6
        github.com/robfig/cron/v3 v3.0.0
diff --git a/go.sum b/go.sum
index 1d105828..8f17cd22 100644
--- a/go.sum
+++ b/go.sum
@@ -542,6 +542,8 @@ github.com/mediocregopher/radix/v3 v3.3.0/go.mod 
h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQ
 github.com/mediocregopher/radix/v3 v3.4.2/go.mod 
h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
 github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2 
h1:sOXuZIg3OwBnvJFfIuO8wegiLpeDCOSVvk2dsbjurd8=
 github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2/go.mod 
h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ=
+github.com/merico-dev/graphql v0.0.0-20221027131946-77460a1fd4cd 
h1:hGQXd4a72JSFIZE+ZVkH5ivE925PGogjob6stgc2too=
+github.com/merico-dev/graphql v0.0.0-20221027131946-77460a1fd4cd/go.mod 
h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ=
 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d 
h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI=
 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod 
h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
 github.com/microcosm-cc/bluemonday v1.0.2/go.mod 
h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
diff --git a/plugins/github/tasks/account_convertor.go 
b/plugins/github/tasks/account_convertor.go
index e8643226..328de4a0 100644
--- a/plugins/github/tasks/account_convertor.go
+++ b/plugins/github/tasks/account_convertor.go
@@ -52,7 +52,19 @@ func ConvertAccounts(taskCtx core.SubTaskContext) 
errors.Error {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*GithubTaskData)
 
-       cursor, err := db.Cursor(dal.From(&githubModels.GithubAccount{}), 
dal.Where("connection_id = ?", data.Options.ConnectionId))
+       cursor, err := db.Cursor(
+               dal.Select("_tool_github_accounts.*"),
+               dal.From(&githubModels.GithubAccount{}),
+               dal.Where(
+                       "repo_github_id = ? and 
_tool_github_accounts.connection_id=?",
+                       data.Repo.GithubId,
+                       data.Options.ConnectionId,
+               ),
+               dal.Join(`left join _tool_github_repo_accounts gra on (
+                       _tool_github_accounts.connection_id = gra.connection_id
+                       AND _tool_github_accounts.id = gra.account_id
+               )`),
+       )
        if err != nil {
                return err
        }
diff --git a/plugins/github_graphql/plugin_main.go 
b/plugins/github_graphql/plugin_main.go
index 937db7e4..a9ad9d85 100644
--- a/plugins/github_graphql/plugin_main.go
+++ b/plugins/github_graphql/plugin_main.go
@@ -19,6 +19,7 @@ package main
 
 import (
        "context"
+       "fmt"
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/github/models"
@@ -71,7 +72,7 @@ func (plugin GithubGraphql) SubTaskMetas() []core.SubTaskMeta 
{
                githubTasks.ExtractApiPrReviewCommentsMeta,
 
                tasks.CollectAccountMeta,
-
+               
                githubTasks.ConvertJobsMeta,
                githubTasks.EnrichPullRequestIssuesMeta,
                githubTasks.ConvertRepoMeta,
@@ -127,17 +128,23 @@ func (plugin GithubGraphql) PrepareTaskData(taskCtx 
core.TaskContext, options ma
        )
        httpClient := oauth2.NewClient(taskCtx.GetContext(), src)
        client := graphql.NewClient(connection.Endpoint+`graphql`, httpClient)
-       graphqlClient := helper.CreateAsyncGraphqlClient(taskCtx.GetContext(), 
client, taskCtx.GetLogger(),
+       graphqlClient, err := helper.CreateAsyncGraphqlClient(taskCtx, client, 
taskCtx.GetLogger(),
                func(ctx context.Context, client *graphql.Client, logger 
core.Logger) (rateRemaining int, resetAt *time.Time, err errors.Error) {
                        var query GraphQueryRateLimit
-                       err = errors.Convert(client.Query(taskCtx.GetContext(), 
&query, nil))
+                       dataErrors, err := 
errors.Convert01(client.Query(taskCtx.GetContext(), &query, nil))
                        if err != nil {
                                return 0, nil, err
                        }
+                       if dataErrors != nil && len(dataErrors) > 0 {
+                               return 0, nil, 
errors.Default.Wrap(dataErrors[0], `query rate limit fail`)
+                       }
                        logger.Info(`github graphql init success with remaining 
%d/%d and will reset at %s`,
                                query.RateLimit.Remaining, 
query.RateLimit.Limit, query.RateLimit.ResetAt)
                        return int(query.RateLimit.Remaining), 
&query.RateLimit.ResetAt, nil
                })
+       if err != nil {
+               return nil, err
+       }
 
        graphqlClient.SetGetRateCost(func(q interface{}) int {
                v := reflect.ValueOf(q)
@@ -165,6 +172,16 @@ func (plugin GithubGraphql) ApiResources() 
map[string]map[string]core.ApiResourc
        return nil
 }
 
+func (plugin GithubGraphql) Close(taskCtx core.TaskContext) errors.Error {
+       data, ok := taskCtx.GetData().(*githubTasks.GithubTaskData)
+       if !ok {
+               return errors.Default.New(fmt.Sprintf("GetData failed when try 
to close %+v", taskCtx))
+       }
+       data.ApiClient.Release()
+       data.GraphqlClient.Release()
+       return nil
+}
+
 // standalone mode for debugging
 func main() {
        cmd := &cobra.Command{Use: "githubGraphql"}
diff --git a/plugins/github_graphql/tasks/account_collector.go 
b/plugins/github_graphql/tasks/account_collector.go
index d095cdd7..89e05df8 100644
--- a/plugins/github_graphql/tasks/account_collector.go
+++ b/plugins/github_graphql/tasks/account_collector.go
@@ -96,10 +96,9 @@ func CollectAccount(taskCtx core.SubTaskContext) 
errors.Error {
                        },
                        Table: RAW_ACCOUNTS_TABLE,
                },
-               IgnoreQueryErr: true,
-               Input:          iterator,
-               InputStep:      100,
-               GraphqlClient:  data.GraphqlClient,
+               Input:         iterator,
+               InputStep:     100,
+               GraphqlClient: data.GraphqlClient,
                BuildQuery: func(reqData *helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
                        accounts := reqData.Input.([]interface{})
                        query := &GraphqlQueryAccountWrapper{}
@@ -115,7 +114,11 @@ func CollectAccount(taskCtx core.SubTaskContext) 
errors.Error {
                        }
                        return query, variables, nil
                },
-               ResponseParser: func(iQuery interface{}, variables 
map[string]interface{}) ([]interface{}, error) {
+               ResponseParserWithDataErrors: func(iQuery interface{}, 
variables map[string]interface{}, dataErrors []graphql.DataError) 
([]interface{}, error) {
+                       for _, dataError := range dataErrors {
+                               // log and ignore
+                               taskCtx.GetLogger().Warn(dataError, `query user 
get error but ignore`)
+                       }
                        query := iQuery.(*GraphqlQueryAccountWrapper)
                        accounts := query.Users
 
diff --git a/plugins/github_graphql/tasks/issue_collector.go 
b/plugins/github_graphql/tasks/issue_collector.go
index f54e4cee..9ceb85a3 100644
--- a/plugins/github_graphql/tasks/issue_collector.go
+++ b/plugins/github_graphql/tasks/issue_collector.go
@@ -25,6 +25,7 @@ import (
        githubTasks "github.com/apache/incubator-devlake/plugins/github/tasks"
        "github.com/apache/incubator-devlake/plugins/helper"
        "github.com/merico-dev/graphql"
+       "strings"
        "time"
 )
 
@@ -170,7 +171,7 @@ func convertGithubIssue(issue GraphqlQueryIssue, 
connectionId uint64, repository
                Number:          issue.Number,
                State:           issue.State,
                Title:           issue.Title,
-               Body:            issue.Body,
+               Body:            strings.ReplaceAll(issue.Body, "\x00", 
`<0x00>`),
                Url:             issue.Url,
                ClosedAt:        issue.ClosedAt,
                GithubCreatedAt: issue.CreatedAt,
diff --git a/plugins/helper/graphql_async_client.go 
b/plugins/helper/graphql_async_client.go
index fcd46aef..765bf7d3 100644
--- a/plugins/helper/graphql_async_client.go
+++ b/plugins/helper/graphql_async_client.go
@@ -19,23 +19,27 @@ package helper
 
 import (
        "context"
+       "fmt"
+       "sync"
+       "time"
+
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/utils"
        "github.com/merico-dev/graphql"
-       "sync"
-       "time"
 )
 
 // GraphqlAsyncClient send graphql one by one
 type GraphqlAsyncClient struct {
-       ctx          context.Context
-       cancel       context.CancelFunc
-       client       *graphql.Client
-       logger       core.Logger
-       mu           sync.Mutex
-       waitGroup    sync.WaitGroup
-       workerErrors []error
+       ctx       context.Context
+       cancel    context.CancelFunc
+       client    *graphql.Client
+       logger    core.Logger
+       mu        sync.Mutex
+       waitGroup sync.WaitGroup
 
+       maxRetry         int
+       waitBeforeRetry  time.Duration
        rateExhaustCond  *sync.Cond
        rateRemaining    int
        getRateRemaining func(context.Context, *graphql.Client, core.Logger) 
(rateRemaining int, resetAt *time.Time, err errors.Error)
@@ -44,12 +48,12 @@ type GraphqlAsyncClient struct {
 
 // CreateAsyncGraphqlClient creates a new GraphqlAsyncClient
 func CreateAsyncGraphqlClient(
-       ctx context.Context,
+       taskCtx core.TaskContext,
        graphqlClient *graphql.Client,
        logger core.Logger,
        getRateRemaining func(context.Context, *graphql.Client, core.Logger) 
(rateRemaining int, resetAt *time.Time, err errors.Error),
-) *GraphqlAsyncClient {
-       ctxWithCancel, cancel := context.WithCancel(ctx)
+) (*GraphqlAsyncClient, errors.Error) {
+       ctxWithCancel, cancel := context.WithCancel(taskCtx.GetContext())
        graphqlAsyncClient := &GraphqlAsyncClient{
                ctx:              ctxWithCancel,
                cancel:           cancel,
@@ -59,14 +63,48 @@ func CreateAsyncGraphqlClient(
                rateRemaining:    0,
                getRateRemaining: getRateRemaining,
        }
+
        if getRateRemaining != nil {
-               rateRemaining, resetAt, err := getRateRemaining(ctx, 
graphqlClient, logger)
+               rateRemaining, resetAt, err := 
getRateRemaining(taskCtx.GetContext(), graphqlClient, logger)
                if err != nil {
                        panic(err)
                }
                graphqlAsyncClient.updateRateRemaining(rateRemaining, resetAt)
        }
-       return graphqlAsyncClient
+
+       // load retry/timeout from configuration
+       // use API_RETRY as max retry time
+       // use API_TIMEOUT as retry before wait seconds to confirm the prev 
request finish
+       timeout := 30 * time.Second
+       retry, err := utils.StrToIntOr(taskCtx.GetConfig("API_RETRY"), 3)
+       if err != nil {
+               return nil, errors.BadInput.Wrap(err, "failed to parse 
API_RETRY")
+       }
+       timeoutConf := taskCtx.GetConfig("API_TIMEOUT")
+       if timeoutConf != "" {
+               // override timeout value if API_TIMEOUT is provided
+               timeout, err = errors.Convert01(time.ParseDuration(timeoutConf))
+               if err != nil {
+                       return nil, errors.BadInput.Wrap(err, "failed to parse 
API_TIMEOUT")
+               }
+       }
+       graphqlAsyncClient.SetMaxRetry(retry, timeout)
+
+       return graphqlAsyncClient, nil
+}
+
+// GetMaxRetry returns the maximum retry attempts for a request
+func (apiClient *GraphqlAsyncClient) GetMaxRetry() (int, time.Duration) {
+       return apiClient.maxRetry, apiClient.waitBeforeRetry
+}
+
+// SetMaxRetry sets the maximum retry attempts for a request
+func (apiClient *GraphqlAsyncClient) SetMaxRetry(
+       maxRetry int,
+       waitBeforeRetry time.Duration,
+) {
+       apiClient.maxRetry = maxRetry
+       apiClient.waitBeforeRetry = waitBeforeRetry
 }
 
 // updateRateRemaining call getRateRemaining to update rateRemaining 
periodically
@@ -80,12 +118,16 @@ func (apiClient *GraphqlAsyncClient) 
updateRateRemaining(rateRemaining int, rese
                if resetAt != nil && resetAt.After(time.Now()) {
                        nextDuring = time.Until(*resetAt)
                }
-               <-time.After(nextDuring)
-               newRateRemaining, newResetAt, err := 
apiClient.getRateRemaining(apiClient.ctx, apiClient.client, apiClient.logger)
-               if err != nil {
-                       panic(err)
+               select {
+               case <-apiClient.ctx.Done():
+                       return
+               case <-time.After(nextDuring):
+                       newRateRemaining, newResetAt, err := 
apiClient.getRateRemaining(apiClient.ctx, apiClient.client, apiClient.logger)
+                       if err != nil {
+                               panic(err)
+                       }
+                       apiClient.updateRateRemaining(newRateRemaining, 
newResetAt)
                }
-               apiClient.updateRateRemaining(newRateRemaining, newResetAt)
        }()
 }
 
@@ -96,7 +138,9 @@ func (apiClient *GraphqlAsyncClient) 
SetGetRateCost(getRateCost func(q interface
 }
 
 // Query send a graphql request when get lock
-func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables 
map[string]interface{}) errors.Error {
+// []graphql.DataError are the errors returned in response body
+// errors.Error is other error
+func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables 
map[string]interface{}) ([]graphql.DataError, errors.Error) {
        apiClient.waitGroup.Add(1)
        defer apiClient.waitGroup.Done()
        apiClient.mu.Lock()
@@ -108,26 +152,40 @@ func (apiClient *GraphqlAsyncClient) Query(q interface{}, 
variables map[string]i
                apiClient.logger.Info(`rate limit remaining exhausted, waiting 
for next period.`)
                apiClient.rateExhaustCond.Wait()
        }
-       select {
-       case <-apiClient.ctx.Done():
-               return nil
-       default:
-               err := apiClient.client.Query(apiClient.ctx, q, variables)
-               if err != nil {
-                       return errors.Default.Wrap(err, "error making GraphQL 
call")
-               }
-               cost := 1
-               if apiClient.getRateCost != nil {
-                       cost = apiClient.getRateCost(q)
+
+       retryTime := 0
+       var err error
+       //  if it needs retry, check and retry
+       for retryTime < apiClient.maxRetry {
+               select {
+               case <-apiClient.ctx.Done():
+                       return nil, nil
+               default:
+                       var dataErrors []graphql.DataError
+                       dataErrors, err := 
apiClient.client.Query(apiClient.ctx, q, variables)
+                       if err != nil {
+                               apiClient.logger.Warn(err, "retry #%d graphql 
calling after %ds", retryTime, apiClient.waitBeforeRetry/time.Second)
+                               retryTime++
+                               <-time.After(apiClient.waitBeforeRetry)
+                               continue
+                       }
+                       if dataErrors != nil {
+                               return dataErrors, nil
+                       }
+                       cost := 1
+                       if apiClient.getRateCost != nil {
+                               cost = apiClient.getRateCost(q)
+                       }
+                       apiClient.rateRemaining -= cost
+                       apiClient.logger.Debug(`query cost %d in %v`, cost, 
variables)
+                       return nil, nil
                }
-               apiClient.rateRemaining -= cost
-               apiClient.logger.Debug(`query cost %d in %v`, cost, variables)
-               return nil
        }
+       return nil, errors.Default.Wrap(err, fmt.Sprintf("got error when 
querying GraphQL (from the %dth retry)", retryTime))
 }
 
 // NextTick to return the NextTick of scheduler
-func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error) {
+func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error, 
taskErrorChecker func(err errors.Error)) {
        // to make sure task will be enqueued
        apiClient.waitGroup.Add(1)
        go func() {
@@ -140,29 +198,18 @@ func (apiClient *GraphqlAsyncClient) NextTick(task func() 
errors.Error) {
                                // But if done out of this go func, so task 
will run after waitGroup finish
                                // I have no idea about this now...
                                defer apiClient.waitGroup.Done()
-                               apiClient.checkError(task())
+                               taskErrorChecker(task())
                        }()
                }
        }()
 }
 
 // Wait blocks until all async requests were done
-func (apiClient *GraphqlAsyncClient) Wait() errors.Error {
+func (apiClient *GraphqlAsyncClient) Wait() {
        apiClient.waitGroup.Wait()
-       if len(apiClient.workerErrors) > 0 {
-               return errors.Default.Combine(apiClient.workerErrors)
-       }
-       return nil
-}
-
-func (apiClient *GraphqlAsyncClient) checkError(err errors.Error) {
-       if err == nil {
-               return
-       }
-       apiClient.workerErrors = append(apiClient.workerErrors, err)
 }
 
-// HasError return if any error occurred
-func (apiClient *GraphqlAsyncClient) HasError() bool {
-       return len(apiClient.workerErrors) > 0
+// Release will release the ApiAsyncClient with scheduler
+func (apiClient *GraphqlAsyncClient) Release() {
+       apiClient.cancel()
 }
diff --git a/plugins/helper/graphql_collector.go 
b/plugins/helper/graphql_collector.go
index 07999394..50270186 100644
--- a/plugins/helper/graphql_collector.go
+++ b/plugins/helper/graphql_collector.go
@@ -65,18 +65,20 @@ type GraphqlCollectorArgs struct {
        Input Iterator
        // how many times fetched from input, default 1 means only fetch once
        // NOTICE: InputStep=1 will fill value as item and InputStep>1 will 
fill value as []item
-       InputStep      int
-       IgnoreQueryErr bool
+       InputStep int
        // GetPageInfo is to tell `GraphqlCollector` is page information
-       GetPageInfo    func(query interface{}, args *GraphqlCollectorArgs) 
(*GraphqlQueryPageInfo, error)
-       BatchSize      int
-       ResponseParser func(query interface{}, variables 
map[string]interface{}) ([]interface{}, error)
+       GetPageInfo func(query interface{}, args *GraphqlCollectorArgs) 
(*GraphqlQueryPageInfo, error)
+       BatchSize   int
+       // one of ResponseParser and ResponseParserEvenWhenDataErrors is 
required to parse response
+       ResponseParser               func(query interface{}, variables 
map[string]interface{}) ([]interface{}, error)
+       ResponseParserWithDataErrors func(query interface{}, variables 
map[string]interface{}, dataErrors []graphql.DataError) ([]interface{}, error)
 }
 
 // GraphqlCollector help you collect data from Graphql services
 type GraphqlCollector struct {
        *RawDataSubTask
-       args *GraphqlCollectorArgs
+       args         *GraphqlCollectorArgs
+       workerErrors []error
 }
 
 // NewGraphqlCollector allocates a new GraphqlCollector with the given args.
@@ -94,8 +96,8 @@ func NewGraphqlCollector(args GraphqlCollectorArgs) 
(*GraphqlCollector, errors.E
        if args.GraphqlClient == nil {
                return nil, errors.Default.New("ApiClient is required")
        }
-       if args.ResponseParser == nil {
-               return nil, errors.Default.New("ResponseParser is required")
+       if args.ResponseParser == nil && args.ResponseParserWithDataErrors == 
nil {
+               return nil, errors.Default.New("one of ResponseParser and 
ResponseParserWithDataErrors is required")
        }
        apicllector := &GraphqlCollector{
                RawDataSubTask: rawDataSubTask,
@@ -143,23 +145,30 @@ func (collector *GraphqlCollector) Execute() errors.Error 
{
        if collector.args.Input != nil {
                iterator := collector.args.Input
                defer iterator.Close()
-               apiClient := collector.args.GraphqlClient
-               for iterator.HasNext() && !apiClient.HasError() {
-                       if collector.args.InputStep == 1 {
+               // the comment about difference is written at 
GraphqlCollectorArgs.InputStep
+               if collector.args.InputStep == 1 {
+                       for iterator.HasNext() && !collector.HasError() {
                                input, err := iterator.Fetch()
                                if err != nil {
+                                       collector.checkError(err)
                                        break
                                }
                                collector.exec(divider, input)
-                       } else {
+                       }
+               } else {
+                       for !collector.HasError() {
                                var inputs []interface{}
                                for i := 0; i < collector.args.InputStep && 
iterator.HasNext(); i++ {
                                        input, err := iterator.Fetch()
                                        if err != nil {
+                                               collector.checkError(err)
                                                break
                                        }
                                        inputs = append(inputs, input)
                                }
+                               if inputs == nil {
+                                       break
+                               }
                                collector.exec(divider, inputs)
                        }
                }
@@ -169,23 +178,25 @@ func (collector *GraphqlCollector) Execute() errors.Error 
{
        }
 
        logger.Debug("wait for all async api to finished")
+       collector.args.GraphqlClient.Wait()
 
-       err = collector.args.GraphqlClient.Wait()
-       if err != nil {
-               err = errors.Default.Wrap(err, "ended API collector execution 
with error")
-               logger.Error(err, "")
+       if collector.HasError() {
+               err = errors.Default.Combine(collector.workerErrors)
+               logger.Error(err, "ended Graphql collector execution with 
error")
+               logger.Error(collector.workerErrors[0], "the first error of 
them")
+               return err
        } else {
                logger.Info("ended api collection without error")
        }
-       err = divider.Close()
 
+       err = divider.Close()
        return err
 }
 
 func (collector *GraphqlCollector) exec(divider *BatchSaveDivider, input 
interface{}) {
        inputJson, err := json.Marshal(input)
        if err != nil {
-               panic(err)
+               collector.checkError(errors.Default.Wrap(err, `input can not be 
marshal to json`))
        }
        reqData := new(GraphqlRequestData)
        reqData.Input = input
@@ -210,6 +221,9 @@ func (collector *GraphqlCollector) fetchOneByOne(divider 
*BatchSaveDivider, reqD
                if err != nil {
                        return errors.Default.Wrap(err, "fetchPagesDetermined 
get totalPages failed")
                }
+               if pageInfo == nil {
+                       return errors.Default.New("fetchPagesDetermined got 
pageInfo is nil")
+               }
                if pageInfo.HasNextPage {
                        collector.args.GraphqlClient.NextTick(func() 
errors.Error {
                                reqDataTemp := &GraphqlRequestData{
@@ -222,7 +236,7 @@ func (collector *GraphqlCollector) fetchOneByOne(divider 
*BatchSaveDivider, reqD
                                }
                                collector.fetchAsync(divider, reqDataTemp, 
fetchNextPage)
                                return nil
-                       })
+                       }, collector.checkError)
                }
                return nil
        }
@@ -238,30 +252,37 @@ func (collector *GraphqlCollector) fetchAsync(divider 
*BatchSaveDivider, reqData
        }
        query, variables, err := collector.args.BuildQuery(reqData)
        if err != nil {
-               panic(err)
+               collector.checkError(errors.Default.Wrap(err, `graphql 
collector BuildQuery failed`))
+               return
        }
 
        logger := collector.args.Ctx.GetLogger()
-       err = collector.args.GraphqlClient.Query(query, variables)
+       dataErrors, err := collector.args.GraphqlClient.Query(query, variables)
        if err != nil {
-               if collector.args.IgnoreQueryErr {
-                       logger.Error(err, "fetchAsync failed")
+               collector.checkError(errors.Default.Wrap(err, `graphql query 
failed`))
+               return
+       }
+       if dataErrors != nil && len(dataErrors) > 0 {
+               if collector.args.ResponseParserWithDataErrors == nil {
+                       collector.checkError(errors.Default.Wrap(err, `graphql 
query got error`))
                        return
                } else {
-                       panic(err)
+                       // error will deal by ResponseParserWithDataErrors
                }
        }
        defer logger.Debug("fetchAsync >>> done for %v %v", query, variables)
 
        paramsBytes, err := json.Marshal(query)
        if err != nil {
-               panic(err)
+               collector.checkError(errors.Default.Wrap(err, `graphql 
collector marshal query failed`))
+               return
        }
        db := collector.args.Ctx.GetDal()
        queryStr, _ := graphql.ConstructQuery(query, variables)
        variablesJson, err := json.Marshal(variables)
        if err != nil {
-               panic(err)
+               collector.checkError(errors.Default.Wrap(err, `variables in 
graphql query can not marshal to json`))
+               return
        }
        row := &RawData{
                Params: collector.params,
@@ -271,12 +292,21 @@ func (collector *GraphqlCollector) fetchAsync(divider 
*BatchSaveDivider, reqData
        }
        err = db.Create(row, dal.From(collector.table))
        if err != nil {
-               panic(err)
+               collector.checkError(errors.Default.Wrap(err, `not created row 
table in graphql collector`))
+               return
        }
 
-       results, err := collector.args.ResponseParser(query, variables)
+       var (
+               results []interface{}
+       )
+       if dataErrors != nil && len(dataErrors) > 0 || 
collector.args.ResponseParser == nil {
+               results, err = 
collector.args.ResponseParserWithDataErrors(query, variables, dataErrors)
+       } else {
+               results, err = collector.args.ResponseParser(query, variables)
+       }
        if err != nil {
-               panic(err)
+               collector.checkError(errors.Default.Wrap(err, `not parsed 
response in graphql collector`))
+               return
        }
 
        RAW_DATA_ORIGIN := "RawDataOrigin"
@@ -285,7 +315,8 @@ func (collector *GraphqlCollector) fetchAsync(divider 
*BatchSaveDivider, reqData
                // get the batch operator for the specific type
                batch, err := divider.ForType(reflect.TypeOf(result))
                if err != nil {
-                       panic(err)
+                       collector.checkError(err)
+                       return
                }
                // set raw data origin field
                origin := 
reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
@@ -299,20 +330,31 @@ func (collector *GraphqlCollector) fetchAsync(divider 
*BatchSaveDivider, reqData
                // records get saved into db when slots were max outed
                err = batch.Add(result)
                if err != nil {
-                       panic(err)
+                       collector.checkError(err)
+                       return
                }
                collector.args.Ctx.IncProgress(1)
        }
-       if err != nil {
-               panic(err)
-       }
        collector.args.Ctx.IncProgress(1)
        if handler != nil {
                err = handler(query)
                if err != nil {
-                       panic(err)
+                       collector.checkError(errors.Default.Wrap(err, `handle 
failed in graphql collector`))
+                       return
                }
        }
 }
 
-var _ core.SubTask = (*ApiCollector)(nil)
+func (collector *GraphqlCollector) checkError(err errors.Error) {
+       if err == nil {
+               return
+       }
+       collector.workerErrors = append(collector.workerErrors, err)
+}
+
+// HasError return if any error occurred
+func (collector *GraphqlCollector) HasError() bool {
+       return len(collector.workerErrors) > 0
+}
+
+var _ core.SubTask = (*GraphqlCollector)(nil)
diff --git a/scripts/compile-plugins.sh b/scripts/compile-plugins.sh
index 36fdd7c7..f14eb74e 100755
--- a/scripts/compile-plugins.sh
+++ b/scripts/compile-plugins.sh
@@ -21,18 +21,21 @@
 #
 # compile specific plugin and fire up api server:
 #   PLUGIN=<PLUGIN_NAME> make dev
+#   PLUGIN=<PLUGIN_NAME> PLUGIN2=<PLUGIN_NAME2> make dev
 #
 # compile all plugins and fire up api server in DEBUG MODE with `delve`:
 #   make debug
 #
 # compile specific plugin and fire up api server in DEBUG MODE with `delve`:
 #   PLUGIN=<PLUGIN_NAME> make dev
+#   PLUGIN=<PLUGIN_NAME> PLUGIN2=<PLUGIN_NAME> make dev
 
 set -e
 
 echo "Usage: "
 echo "  build all plugins:              $0 [golang build flags...]"
 echo "  build and keep one plugin only: PLUGIN=jira $0 [golang build flags...]"
+echo "  build and keep two plugin only: PLUGIN=jira PLUGIN2=github $0 [golang 
build flags...]"
 
 SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
 PLUGIN_SRC_DIR=$SCRIPT_DIR/../plugins
@@ -44,6 +47,10 @@ else
     PLUGINS=$PLUGIN_SRC_DIR/$PLUGIN
 fi
 
+if [ $PLUGIN ] && [ $PLUGIN2 ]; then
+    PLUGINS="$PLUGINS $PLUGIN_SRC_DIR/$PLUGIN2"
+fi
+
 rm -rf $PLUGIN_OUTPUT_DIR/*
 
 PIDS=""

Reply via email to