This is an automated email from the ASF dual-hosted git repository.

zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ebab5a2 feat: graphql POC in github (#2619)
3ebab5a2 is described below

commit 3ebab5a21ae996204a52d1a52f6a864e764250b9
Author: likyh <[email protected]>
AuthorDate: Thu Aug 18 16:55:10 2022 +0800

    feat: graphql POC in github (#2619)
    
    * feat: create the github graphql plugin and create some crude helpers
    
    * fix: fix some bugs
    
    * feat: request 100 users pre query
    
    * feat: request events by rest
    
    * fix: record query as url in raw layer
    
    * fix: skip the account which type!='User'
    
    * feat: copy old collector
    
    * fix: ignore collect account not found error
    
    * fix: change TaskData to avoid duplicated collector
    
    Co-authored-by: linyh <[email protected]>
---
 go.mod                                             |   8 +-
 go.sum                                             |  16 +
 plugins/github/tasks/account_collector.go          |   1 -
 plugins/github/tasks/task_data.go                  |   9 +-
 plugins/github_graphql/plugin_main.go              | 171 +++++++++++
 plugins/github_graphql/tasks/account_collector.go  | 166 ++++++++++
 .../tasks/account_graphql_pre_extractor.go         |  47 +++
 .../tasks/account_rest_pre_extractor.go            |  56 ++++
 plugins/github_graphql/tasks/issue_collector.go    | 238 +++++++++++++++
 plugins/github_graphql/tasks/pr_collector.go       | 333 +++++++++++++++++++++
 plugins/github_graphql/tasks/repo_collector.go     | 136 +++++++++
 plugins/helper/graphql_async_client.go             | 165 ++++++++++
 plugins/helper/graphql_collector.go                | 309 +++++++++++++++++++
 13 files changed, 1648 insertions(+), 7 deletions(-)

diff --git a/go.mod b/go.mod
index 9c33f307..7e042d66 100644
--- a/go.mod
+++ b/go.mod
@@ -26,6 +26,7 @@ require (
        go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a
        go.temporal.io/sdk v1.14.0
        golang.org/x/crypto v0.0.0-20210921155107-089bfa567519
+       golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602
        golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
        gorm.io/datatypes v1.0.1
        gorm.io/driver/mysql v1.3.3
@@ -85,6 +86,7 @@ require (
        github.com/mattn/go-colorable v0.1.6 // indirect
        github.com/mattn/go-isatty v0.0.13 // indirect
        github.com/mattn/go-sqlite3 v1.14.6 // indirect
+       github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2 // 
indirect
        github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
        github.com/mitchellh/go-homedir v1.1.0 // indirect
        github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // 
indirect
@@ -97,6 +99,7 @@ require (
        github.com/robfig/cron v1.2.0 // indirect
        github.com/russross/blackfriday/v2 v2.1.0 // indirect
        github.com/sergi/go-diff v1.1.0 // indirect
+       github.com/shurcooL/graphql v0.0.0-20220606043923-3cf50f8a0a29 // 
indirect
        github.com/spf13/cast v1.4.1 // indirect
        github.com/spf13/jwalterweatherman v1.1.0 // indirect
        github.com/spf13/pflag v1.0.6-0.20200504143853-81378bbcd8a1 // indirect
@@ -105,12 +108,13 @@ require (
        github.com/ugorji/go/codec v1.2.6 // indirect
        github.com/xanzy/ssh-agent v0.3.0 // indirect
        go.uber.org/atomic v1.9.0 // indirect
-       golang.org/x/net v0.0.0-20220708220712-1185a9018129 // indirect
-       golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e // indirect
+       golang.org/x/net v0.0.0-20220728211354-c7608f3a8462 // indirect
+       golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect
        golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
        golang.org/x/text v0.3.7 // indirect
        golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
        golang.org/x/tools v0.1.11 // indirect
+       google.golang.org/appengine v1.6.7 // indirect
        google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf // 
indirect
        google.golang.org/grpc v1.44.0 // indirect
        google.golang.org/protobuf v1.27.1 // indirect
diff --git a/go.sum b/go.sum
index faf3b124..e0931d46 100644
--- a/go.sum
+++ b/go.sum
@@ -432,6 +432,12 @@ github.com/mattn/go-isatty v0.0.13/go.mod 
h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
 github.com/mattn/go-sqlite3 v1.14.5/go.mod 
h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI=
 github.com/mattn/go-sqlite3 v1.14.6 
h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
 github.com/mattn/go-sqlite3 v1.14.6/go.mod 
h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
+github.com/merico-dev/graphql v0.0.0-20220606043923-3cf50f8a0a29 
h1:jEarKDWDyd59SKG2AoH/3JYqRFfl7RE2uEzGJRpCl3Q=
+github.com/merico-dev/graphql v0.0.0-20220606043923-3cf50f8a0a29/go.mod 
h1:q+XAHrrzQfwrEaGgnl/VduY0Gd9FqSnMQEpKq101TxE=
+github.com/merico-dev/graphql v0.0.0-20220803162350-42fdc19ba54c 
h1:7l5zeXBLMWafzHD0ElpaNVTVhNzcQz7Urkw9WebNt5o=
+github.com/merico-dev/graphql v0.0.0-20220803162350-42fdc19ba54c/go.mod 
h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ=
+github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2 
h1:sOXuZIg3OwBnvJFfIuO8wegiLpeDCOSVvk2dsbjurd8=
+github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2/go.mod 
h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ=
 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d 
h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI=
 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod 
h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
 github.com/miekg/dns v1.0.14/go.mod 
h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
@@ -510,6 +516,8 @@ github.com/shopspring/decimal 
v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9Nz
 github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod 
h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
 github.com/shopspring/decimal v1.2.0 
h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
 github.com/shopspring/decimal v1.2.0/go.mod 
h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
+github.com/shurcooL/graphql v0.0.0-20220606043923-3cf50f8a0a29 
h1:B1PEwpArrNp4dkQrfxh/abbBAOZBVp0ds+fBEOUOqOc=
+github.com/shurcooL/graphql v0.0.0-20220606043923-3cf50f8a0a29/go.mod 
h1:AuYgA5Kyo4c7HfUmvRGs/6rGlMMV/6B1bVnB9JxJEEg=
 github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod 
h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
 github.com/sirupsen/logrus v1.4.1/go.mod 
h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
 github.com/sirupsen/logrus v1.4.2/go.mod 
h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
@@ -706,6 +714,10 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod 
h1:CfG3xpIq0wQ8r1q4Su
 golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod 
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/net v0.0.0-20220708220712-1185a9018129 
h1:vucSRfWwTsoXro7P+3Cjlr6flUMtzCwzlvkxEQtHHB0=
 golang.org/x/net v0.0.0-20220708220712-1185a9018129/go.mod 
h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.0.0-20220728211354-c7608f3a8462 
h1:UreQrH7DbFXSi9ZFox6FNT3WBooWmdANpU+IfkT1T4I=
+golang.org/x/net v0.0.0-20220728211354-c7608f3a8462/go.mod 
h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
+golang.org/x/net v0.0.0-20220802222814-0bcc04d9c69b 
h1:3ogNYyK4oIQdIKzTu68hQrr4iuVxF3AxKl9Aj/eDrw0=
+golang.org/x/net v0.0.0-20220802222814-0bcc04d9c69b/go.mod 
h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod 
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -717,6 +729,7 @@ golang.org/x/oauth2 
v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ
 golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod 
h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
 golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93/go.mod 
h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
 golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod 
h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
+golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 
h1:0Ja1LBD+yisY6RWM/BH7TJVXWsSjs2VwBSmvSX4HdBc=
 golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602/go.mod 
h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -799,6 +812,8 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod 
h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20220222200937-f2425489ef4c/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e 
h1:NHvCuwuS43lGnYhten69ZWqi2QOj/CiDNcKbVqwVoew=
 golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 
h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg=
+golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod 
h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod 
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 
h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
@@ -913,6 +928,7 @@ google.golang.org/appengine v1.5.0/go.mod 
h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
 google.golang.org/appengine v1.6.1/go.mod 
h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
 google.golang.org/appengine v1.6.5/go.mod 
h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
 google.golang.org/appengine v1.6.6/go.mod 
h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
+google.golang.org/appengine v1.6.7 
h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
 google.golang.org/appengine v1.6.7/go.mod 
h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
 google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod 
h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod 
h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
diff --git a/plugins/github/tasks/account_collector.go 
b/plugins/github/tasks/account_collector.go
index fbeaa279..829a4917 100644
--- a/plugins/github/tasks/account_collector.go
+++ b/plugins/github/tasks/account_collector.go
@@ -76,7 +76,6 @@ func CollectAccounts(taskCtx core.SubTaskContext) error {
                },
                AfterResponse: func(res *http.Response) error {
                        if res.StatusCode == http.StatusNotFound {
-                               println(res.Request.URL)
                                return helper.ErrIgnoreAndContinue
                        }
                        return nil
diff --git a/plugins/github/tasks/task_data.go 
b/plugins/github/tasks/task_data.go
index 1866cdca..14b5980e 100644
--- a/plugins/github/tasks/task_data.go
+++ b/plugins/github/tasks/task_data.go
@@ -36,10 +36,11 @@ type GithubOptions struct {
 }
 
 type GithubTaskData struct {
-       Options   *GithubOptions
-       ApiClient *helper.ApiAsyncClient
-       Since     *time.Time
-       Repo      *models.GithubRepo
+       Options       *GithubOptions
+       ApiClient     *helper.ApiAsyncClient
+       GraphqlClient *helper.GraphqlAsyncClient
+       Since         *time.Time
+       Repo          *models.GithubRepo
 }
 
 func DecodeAndValidateTaskOptions(options map[string]interface{}) 
(*GithubOptions, error) {
diff --git a/plugins/github_graphql/plugin_main.go 
b/plugins/github_graphql/plugin_main.go
new file mode 100755
index 00000000..acd6b20e
--- /dev/null
+++ b/plugins/github_graphql/plugin_main.go
@@ -0,0 +1,171 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/github/models"
+       githubTasks "github.com/apache/incubator-devlake/plugins/github/tasks"
+       "github.com/apache/incubator-devlake/plugins/github_graphql/tasks"
+       "github.com/apache/incubator-devlake/plugins/helper"
+       "github.com/apache/incubator-devlake/runner"
+       "github.com/merico-dev/graphql"
+       "github.com/mitchellh/mapstructure"
+       "github.com/spf13/cobra"
+       "github.com/spf13/viper"
+       "golang.org/x/oauth2"
+       "gorm.io/gorm"
+       "reflect"
+       "strings"
+       "time"
+)
+
+// make sure interface is implemented
+var _ core.PluginMeta = (*GithubGraphql)(nil)
+var _ core.PluginInit = (*GithubGraphql)(nil)
+var _ core.PluginTask = (*GithubGraphql)(nil)
+var _ core.PluginApi = (*GithubGraphql)(nil)
+
+// Export a variable named PluginEntry for Framework to search and load
+var PluginEntry GithubGraphql //nolint
+
+type GithubGraphql struct{}
+
+func (plugin GithubGraphql) Description() string {
+       return "collect some GithubGraphql data"
+}
+
+func (plugin GithubGraphql) Init(config *viper.Viper, logger core.Logger, db 
*gorm.DB) error {
+       return nil
+}
+
+func (plugin GithubGraphql) SubTaskMetas() []core.SubTaskMeta {
+       return []core.SubTaskMeta{
+               tasks.CollectRepoMeta,
+               tasks.CollectIssueMeta,
+               tasks.CollectPrMeta,
+
+               githubTasks.CollectApiCommentsMeta,
+               githubTasks.ExtractApiCommentsMeta,
+               githubTasks.CollectApiEventsMeta,
+               githubTasks.ExtractApiEventsMeta,
+               githubTasks.CollectMilestonesMeta,
+               githubTasks.ExtractMilestonesMeta,
+               githubTasks.CollectApiPrReviewCommentsMeta,
+               githubTasks.ExtractApiPrReviewCommentsMeta,
+
+               tasks.CollectAccountMeta,
+       }
+}
+
+type GraphQueryRateLimit struct {
+       RateLimit struct {
+               Limit     graphql.Int
+               Remaining graphql.Int
+               ResetAt   time.Time
+       }
+}
+
+func (plugin GithubGraphql) PrepareTaskData(taskCtx core.TaskContext, options 
map[string]interface{}) (interface{}, error) {
+       var op githubTasks.GithubOptions
+       err := mapstructure.Decode(options, &op)
+       if err != nil {
+               return nil, err
+       }
+       if op.Owner == "" {
+               return nil, fmt.Errorf("owner is required for GitHub execution")
+       }
+       if op.Repo == "" {
+               return nil, fmt.Errorf("repo is required for GitHub execution")
+       }
+
+       connectionHelper := helper.NewConnectionHelper(
+               taskCtx,
+               nil,
+       )
+       connection := &models.GithubConnection{}
+       err = connectionHelper.FirstById(connection, op.ConnectionId)
+       if err != nil {
+               return nil, fmt.Errorf("unable to get github connection by the 
given connection ID: %v", err)
+       }
+
+       tokens := strings.Split(connection.Token, ",")
+       src := oauth2.StaticTokenSource(
+               &oauth2.Token{AccessToken: tokens[0]},
+       )
+       httpClient := oauth2.NewClient(taskCtx.GetContext(), src)
+       client := graphql.NewClient(connection.Endpoint+`graphql`, httpClient)
+       graphqlClient := helper.CreateAsyncGraphqlClient(taskCtx.GetContext(), 
client, taskCtx.GetLogger(),
+               func(ctx context.Context, client *graphql.Client, logger 
core.Logger) (rateRemaining int, resetAt *time.Time, err error) {
+                       var query GraphQueryRateLimit
+                       err = client.Query(taskCtx.GetContext(), &query, nil)
+                       if err != nil {
+                               return 0, nil, err
+                       }
+                       logger.Info(`github graphql init success with remaining 
%d/%d and will reset at %s`,
+                               query.RateLimit.Remaining, 
query.RateLimit.Limit, query.RateLimit.ResetAt)
+                       return int(query.RateLimit.Remaining), 
&query.RateLimit.ResetAt, nil
+               })
+
+       graphqlClient.SetGetRateCost(func(q interface{}) int {
+               v := reflect.ValueOf(q)
+               return 
int(v.Elem().FieldByName(`RateLimit`).FieldByName(`Cost`).Int())
+       })
+
+       apiClient, err := githubTasks.CreateApiClient(taskCtx, connection)
+       if err != nil {
+               return nil, fmt.Errorf("unable to get github API client 
instance: %v", err)
+       }
+
+       return &githubTasks.GithubTaskData{
+               Options:       &op,
+               ApiClient:     apiClient,
+               GraphqlClient: graphqlClient,
+       }, nil
+}
+
+// PkgPath information lost when compiled as plugin(.so)
+func (plugin GithubGraphql) RootPkgPath() string {
+       return "github.com/apache/incubator-devlake/plugins/githubGraphql"
+}
+
+func (plugin GithubGraphql) ApiResources() 
map[string]map[string]core.ApiResourceHandler {
+       return nil
+}
+
+// standalone mode for debugging
+func main() {
+       cmd := &cobra.Command{Use: "githubGraphql"}
+       connectionId := cmd.Flags().Uint64P("connectionId", "c", 0, "github 
connection id")
+       owner := cmd.Flags().StringP("owner", "o", "", "github owner")
+       repo := cmd.Flags().StringP("repo", "r", "", "github repo")
+       _ = cmd.MarkFlagRequired("connectionId")
+       _ = cmd.MarkFlagRequired("owner")
+       _ = cmd.MarkFlagRequired("repo")
+
+       cmd.Run = func(cmd *cobra.Command, args []string) {
+               runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
+                       "connectionId": *connectionId,
+                       "owner":        *owner,
+                       "repo":         *repo,
+               })
+       }
+       runner.RunCmd(cmd)
+}
diff --git a/plugins/github_graphql/tasks/account_collector.go 
b/plugins/github_graphql/tasks/account_collector.go
new file mode 100755
index 00000000..835eecfe
--- /dev/null
+++ b/plugins/github_graphql/tasks/account_collector.go
@@ -0,0 +1,166 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
+       "github.com/apache/incubator-devlake/plugins/github/models"
+       githubTasks "github.com/apache/incubator-devlake/plugins/github/tasks"
+       "github.com/apache/incubator-devlake/plugins/helper"
+       "github.com/merico-dev/graphql"
+       "reflect"
+)
+
+const RAW_ACCOUNTS_TABLE = "github_graphql_accounts"
+
+type GraphqlQueryAccountWrapper struct {
+       RateLimit struct {
+               Cost int
+       }
+       Users []GraphqlQueryAccount `graphql:"user(login: $login)" 
graphql-extend:"true"`
+}
+
+type GraphqlQueryAccount struct {
+       Login     string
+       Id        int `graphql:"databaseId"`
+       Name      string
+       Company   string
+       Email     string
+       AvatarUrl string
+       HtmlUrl   string `graphql:"url"`
+       //Type      string
+       Organizations struct {
+               Nodes []struct {
+                       Email      string
+                       Name       string
+                       DatabaseId int
+                       Login      string
+               }
+       } `graphql:"organizations(first: 10)"`
+}
+
+var CollectAccountMeta = core.SubTaskMeta{
+       Name:             "CollectAccount",
+       EntryPoint:       CollectAccount,
+       EnabledByDefault: true,
+       Description:      "Collect Account data from GithubGraphql api",
+}
+
+type SimpleAccount struct {
+       Login string
+}
+
+var _ core.SubTaskEntryPoint = CollectAccount
+
+func CollectAccount(taskCtx core.SubTaskContext) error {
+       db := taskCtx.GetDal()
+       data := taskCtx.GetData().(*githubTasks.GithubTaskData)
+
+       cursor, err := db.Cursor(
+               dal.Select("login"),
+               dal.From(models.GithubRepoAccount{}.TableName()),
+               dal.Where("repo_github_id = ? and connection_id=?", 
data.Repo.GithubId, data.Options.ConnectionId),
+       )
+       if err != nil {
+               return err
+       }
+       iterator, err := helper.NewDalCursorIterator(db, cursor, 
reflect.TypeOf(SimpleAccount{}))
+       if err != nil {
+               return err
+       }
+
+       collector, err := 
helper.NewGraphqlCollector(helper.GraphqlCollectorArgs{
+               RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+                       Ctx: taskCtx,
+                       Params: githubTasks.GithubApiParams{
+                               ConnectionId: data.Options.ConnectionId,
+                               Owner:        data.Options.Owner,
+                               Repo:         data.Options.Repo,
+                       },
+                       Table: RAW_ACCOUNTS_TABLE,
+               },
+               IgnoreQueryErr: true,
+               Input:          iterator,
+               InputStep:      100,
+               GraphqlClient:  data.GraphqlClient,
+               BuildQuery: func(reqData *helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
+                       accounts := reqData.Input.([]interface{})
+                       query := &GraphqlQueryAccountWrapper{}
+                       users := []map[string]interface{}{}
+                       for _, iAccount := range accounts {
+                               account := iAccount.(*SimpleAccount)
+                               users = append(users, map[string]interface{}{
+                                       `login`: graphql.String(account.Login),
+                               })
+                       }
+                       variables := map[string]interface{}{
+                               "user": users,
+                       }
+                       return query, variables, nil
+               },
+               ResponseParser: func(iQuery interface{}, variables 
map[string]interface{}) ([]interface{}, error) {
+                       query := iQuery.(*GraphqlQueryAccountWrapper)
+                       accounts := query.Users
+
+                       results := make([]interface{}, 0, 1)
+                       for _, account := range accounts {
+                               relatedUsers, err := convertAccount(account, 
data.Options.ConnectionId)
+                               if err != nil {
+                                       return nil, err
+                               }
+                               results = append(results, relatedUsers...)
+                       }
+                       return results, nil
+               },
+       })
+
+       if err != nil {
+               return err
+       }
+
+       return collector.Execute()
+}
+
+func convertAccount(res GraphqlQueryAccount, connId uint64) ([]interface{}, 
error) {
+       results := make([]interface{}, 0, len(res.Organizations.Nodes)+1)
+       githubAccount := &models.GithubAccount{
+               ConnectionId: connId,
+               Id:           res.Id,
+               Login:        res.Login,
+               Name:         res.Name,
+               Company:      res.Company,
+               Email:        res.Email,
+               AvatarUrl:    res.AvatarUrl,
+               //Url:          res.Url,
+               HtmlUrl: res.HtmlUrl,
+               Type:    `User`,
+       }
+       results = append(results, githubAccount)
+       for _, apiAccountOrg := range res.Organizations.Nodes {
+               githubAccountOrg := &models.GithubAccountOrg{
+                       ConnectionId: connId,
+                       AccountId:    res.Id,
+                       OrgId:        apiAccountOrg.DatabaseId,
+                       OrgLogin:     apiAccountOrg.Login,
+               }
+               results = append(results, githubAccountOrg)
+       }
+
+       return results, nil
+}
diff --git a/plugins/github_graphql/tasks/account_graphql_pre_extractor.go 
b/plugins/github_graphql/tasks/account_graphql_pre_extractor.go
new file mode 100644
index 00000000..432fc5c7
--- /dev/null
+++ b/plugins/github_graphql/tasks/account_graphql_pre_extractor.go
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+       "github.com/apache/incubator-devlake/plugins/github/models"
+)
+
+type GithubAccountEdge struct {
+       Login     string
+       Id        int `graphql:"databaseId"`
+       Name      string
+       Company   string
+       Email     string
+       AvatarUrl string
+       HtmlUrl   string `graphql:"url"`
+       //Type      string
+}
+type GraphqlInlineAccountQuery struct {
+       GithubAccountEdge `graphql:"... on User"`
+}
+
+func convertGraphqlPreAccount(res GraphqlInlineAccountQuery, repoId int, 
connId uint64) (*models.GithubRepoAccount, error) {
+       githubAccount := &models.GithubRepoAccount{
+               ConnectionId: connId,
+               RepoGithubId: repoId,
+               Login:        res.Login,
+               AccountId:    res.Id,
+       }
+
+       return githubAccount, nil
+}
diff --git a/plugins/github_graphql/tasks/account_rest_pre_extractor.go 
b/plugins/github_graphql/tasks/account_rest_pre_extractor.go
new file mode 100644
index 00000000..54a9c772
--- /dev/null
+++ b/plugins/github_graphql/tasks/account_rest_pre_extractor.go
@@ -0,0 +1,56 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+       "github.com/apache/incubator-devlake/plugins/github/models"
+)
+
+type GithubAccountResponse struct {
+       Login             string `json:"login"`
+       Id                int    `json:"id"`
+       NodeId            string `json:"node_id"`
+       AvatarUrl         string `json:"avatar_url"`
+       GravatarId        string `json:"gravatar_id"`
+       Url               string `json:"url"`
+       HtmlUrl           string `json:"html_url"`
+       FollowersUrl      string `json:"followers_url"`
+       FollowingUrl      string `json:"following_url"`
+       GistsUrl          string `json:"gists_url"`
+       StarredUrl        string `json:"starred_url"`
+       SubscriptionsUrl  string `json:"subscriptions_url"`
+       OrganizationsUrl  string `json:"organizations_url"`
+       ReposUrl          string `json:"repos_url"`
+       EventsUrl         string `json:"events_url"`
+       ReceivedEventsUrl string `json:"received_events_url"`
+       Type              string `json:"type"`
+       SiteAdmin         bool   `json:"site_admin"`
+}
+
+func convertRestPreAccount(res *GithubAccountResponse, repoId int, connId 
uint64) ([]interface{}, error) {
+       if res.Type != `User` {
+               return nil, nil
+       }
+       githubAccount := &models.GithubRepoAccount{
+               ConnectionId: connId,
+               RepoGithubId: repoId,
+               Login:        res.Login,
+               AccountId:    res.Id,
+       }
+       return []interface{}{githubAccount}, nil
+}
diff --git a/plugins/github_graphql/tasks/issue_collector.go 
b/plugins/github_graphql/tasks/issue_collector.go
new file mode 100755
index 00000000..ae61d89e
--- /dev/null
+++ b/plugins/github_graphql/tasks/issue_collector.go
@@ -0,0 +1,238 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+       "github.com/apache/incubator-devlake/models/domainlayer/ticket"
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/github/models"
+       githubTasks "github.com/apache/incubator-devlake/plugins/github/tasks"
+       "github.com/apache/incubator-devlake/plugins/helper"
+       "github.com/merico-dev/graphql"
+       "time"
+)
+
+const RAW_ISSUES_TABLE = "github_graphql_issues"
+
+type GraphqlQueryIssueWrapper struct {
+       RateLimit struct {
+               Cost int
+       }
+       Repository struct {
+               IssueList struct {
+                       TotalCount graphql.Int
+                       Issues     []GraphqlQueryIssue `graphql:"nodes"`
+                       PageInfo   *helper.GraphqlQueryPageInfo
+               } `graphql:"issues(first: $pageSize, after: $skipCursor)"`
+       } `graphql:"repository(owner: $owner, name: $name)"`
+}
+
+type GraphqlQueryIssue struct {
+       DatabaseId   int
+       Number       int
+       State        string
+       StateReason  string
+       Title        string
+       Body         string
+       Author       *GraphqlInlineAccountQuery
+       Url          string
+       ClosedAt     *time.Time
+       CreatedAt    time.Time
+       UpdatedAt    time.Time
+       AssigneeList struct {
+               // FIXME now domain layer just support one assignee
+               Assignees []GraphqlInlineAccountQuery `graphql:"nodes"`
+       } `graphql:"assignees(first: 1)"`
+       Milestone *struct {
+               Number int `json:"number"`
+       } `json:"milestone"`
+       Labels struct {
+               Nodes []struct {
+                       Id   string `json:"id"`
+                       Name string `json:"name"`
+               }
+       } `graphql:"labels(first: 100)"`
+}
+
+var CollectIssueMeta = core.SubTaskMeta{
+       Name:             "CollectIssue",
+       EntryPoint:       CollectIssue,
+       EnabledByDefault: true,
+       Description:      "Collect Issue data from GithubGraphql api",
+}
+
+var _ core.SubTaskEntryPoint = CollectIssue
+
+func CollectIssue(taskCtx core.SubTaskContext) error {
+       data := taskCtx.GetData().(*githubTasks.GithubTaskData)
+       config := data.Options.TransformationRules
+       issueRegexes, err := githubTasks.NewIssueRegexes(config)
+       if err != nil {
+               return nil
+       }
+
+       collector, err := 
helper.NewGraphqlCollector(helper.GraphqlCollectorArgs{
+               RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+                       Ctx: taskCtx,
+                       /*
+                               This struct will be JSONEncoded and stored into 
database along with raw data itself, to identity minimal
+                               set of data to be process, for example, we 
process JiraIssues by Board
+                       */
+                       Params: githubTasks.GithubApiParams{
+                               ConnectionId: data.Options.ConnectionId,
+                               Owner:        data.Options.Owner,
+                               Repo:         data.Options.Repo,
+                       },
+                       Table: RAW_ISSUES_TABLE,
+               },
+               GraphqlClient: data.GraphqlClient,
+               PageSize:      100,
+               /*
+                       (Optional) Return query string for request, or you can 
plug them into UrlTemplate directly
+               */
+               BuildQuery: func(reqData *helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
+                       query := &GraphqlQueryIssueWrapper{}
+                       variables := map[string]interface{}{
+                               "pageSize":   graphql.Int(reqData.Pager.Size),
+                               "skipCursor": 
(*graphql.String)(reqData.Pager.SkipCursor),
+                               "owner":      
graphql.String(data.Options.Owner),
+                               "name":       graphql.String(data.Options.Repo),
+                       }
+                       return query, variables, nil
+               },
+               GetPageInfo: func(iQuery interface{}, args 
*helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) {
+                       query := iQuery.(*GraphqlQueryIssueWrapper)
+                       return query.Repository.IssueList.PageInfo, nil
+               },
+               ResponseParser: func(iQuery interface{}, variables 
map[string]interface{}) ([]interface{}, error) {
+                       query := iQuery.(*GraphqlQueryIssueWrapper)
+                       issues := query.Repository.IssueList.Issues
+
+                       results := make([]interface{}, 0, 1)
+                       for _, issue := range issues {
+                               githubIssue, err := convertGithubIssue(issue, 
data.Options.ConnectionId, data.Repo.GithubId)
+                               if err != nil {
+                                       return nil, err
+                               }
+                               githubLabels, err := 
convertGithubLabels(issueRegexes, issue, githubIssue)
+                               if err != nil {
+                                       return nil, err
+                               }
+                               results = append(results, githubLabels...)
+                               results = append(results, githubIssue)
+                               if issue.AssigneeList.Assignees != nil && 
len(issue.AssigneeList.Assignees) > 0 {
+                                       relatedUser, err := 
convertGraphqlPreAccount(issue.AssigneeList.Assignees[0], data.Repo.GithubId, 
data.Options.ConnectionId)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       results = append(results, relatedUser)
+                               }
+                               if issue.Author != nil {
+                                       relatedUser, err := 
convertGraphqlPreAccount(*issue.Author, data.Repo.GithubId, 
data.Options.ConnectionId)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       results = append(results, relatedUser)
+                               }
+                       }
+                       return results, nil
+               },
+       })
+
+       if err != nil {
+               return err
+       }
+
+       return collector.Execute()
+}
+
+func convertGithubIssue(issue GraphqlQueryIssue, connectionId uint64, 
repositoryId int) (*models.GithubIssue, error) {
+       githubIssue := &models.GithubIssue{
+               ConnectionId:    connectionId,
+               GithubId:        issue.DatabaseId,
+               RepoId:          repositoryId,
+               Number:          issue.Number,
+               State:           issue.State,
+               Title:           issue.Title,
+               Body:            issue.Body,
+               Url:             issue.Url,
+               ClosedAt:        issue.ClosedAt,
+               GithubCreatedAt: issue.CreatedAt,
+               GithubUpdatedAt: issue.UpdatedAt,
+       }
+       if issue.AssigneeList.Assignees != nil && 
len(issue.AssigneeList.Assignees) > 0 {
+               githubIssue.AssigneeId = issue.AssigneeList.Assignees[0].Id
+               githubIssue.AssigneeName = issue.AssigneeList.Assignees[0].Login
+       }
+       if issue.Author != nil {
+               githubIssue.AuthorId = issue.Author.Id
+               githubIssue.AuthorName = issue.Author.Login
+       }
+       if issue.ClosedAt != nil {
+               githubIssue.LeadTimeMinutes = 
uint(issue.ClosedAt.Sub(issue.CreatedAt).Minutes())
+       }
+       if issue.Milestone != nil {
+               githubIssue.MilestoneId = issue.Milestone.Number
+       }
+       return githubIssue, nil
+}
+
+func convertGithubLabels(issueRegexes *githubTasks.IssueRegexes, issue 
GraphqlQueryIssue, githubIssue *models.GithubIssue) ([]interface{}, error) {
+       var results []interface{}
+       for _, label := range issue.Labels.Nodes {
+               results = append(results, &models.GithubIssueLabel{
+                       ConnectionId: githubIssue.ConnectionId,
+                       IssueId:      githubIssue.GithubId,
+                       LabelName:    label.Name,
+               })
+               if issueRegexes.SeverityRegex != nil {
+                       groups := 
issueRegexes.SeverityRegex.FindStringSubmatch(label.Name)
+                       if len(groups) > 0 {
+                               githubIssue.Severity = groups[1]
+                       }
+               }
+               if issueRegexes.ComponentRegex != nil {
+                       groups := 
issueRegexes.ComponentRegex.FindStringSubmatch(label.Name)
+                       if len(groups) > 0 {
+                               githubIssue.Component = groups[1]
+                       }
+               }
+               if issueRegexes.PriorityRegex != nil {
+                       groups := 
issueRegexes.PriorityRegex.FindStringSubmatch(label.Name)
+                       if len(groups) > 0 {
+                               githubIssue.Priority = groups[1]
+                       }
+               }
+               if issueRegexes.TypeBugRegex != nil {
+                       if ok := 
issueRegexes.TypeBugRegex.MatchString(label.Name); ok {
+                               githubIssue.Type = ticket.BUG
+                       }
+               }
+               if issueRegexes.TypeRequirementRegex != nil {
+                       if ok := 
issueRegexes.TypeRequirementRegex.MatchString(label.Name); ok {
+                               githubIssue.Type = ticket.REQUIREMENT
+                       }
+               }
+               if issueRegexes.TypeIncidentRegex != nil {
+                       if ok := 
issueRegexes.TypeIncidentRegex.MatchString(label.Name); ok {
+                               githubIssue.Type = ticket.INCIDENT
+                       }
+               }
+       }
+       return results, nil
+}
diff --git a/plugins/github_graphql/tasks/pr_collector.go 
b/plugins/github_graphql/tasks/pr_collector.go
new file mode 100755
index 00000000..08ec5ed5
--- /dev/null
+++ b/plugins/github_graphql/tasks/pr_collector.go
@@ -0,0 +1,333 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+       "fmt"
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/github/models"
+       githubTasks "github.com/apache/incubator-devlake/plugins/github/tasks"
+       "github.com/apache/incubator-devlake/plugins/helper"
+       "github.com/merico-dev/graphql"
+       "regexp"
+       "runtime/debug"
+       "time"
+)
+
+const RAW_PRS_TABLE = "github_graphql_prs"
+
+type GraphqlQueryPrWrapper struct {
+       RateLimit struct {
+               Cost int
+       }
+       Repository struct {
+               PullRequests struct {
+                       PageInfo   *helper.GraphqlQueryPageInfo
+                       Prs        []GraphqlQueryPr `graphql:"nodes"`
+                       TotalCount graphql.Int
+               } `graphql:"pullRequests(first: $pageSize, after: $skipCursor)"`
+       } `graphql:"repository(owner: $owner, name: $name)"`
+}
+
+type GraphqlQueryPr struct {
+       DatabaseId int
+       Number     int
+       State      string
+       Title      string
+       Body       string
+       Url        string
+       Labels     struct {
+               Nodes []struct {
+                       Id   string
+                       Name string
+               }
+       } `graphql:"labels(first: 100)"`
+       Author    *GraphqlInlineAccountQuery
+       Assignees struct {
+               // FIXME now domain layer just support one assignee
+               Assignees []GraphqlInlineAccountQuery `graphql:"nodes"`
+       } `graphql:"assignees(first: 1)"`
+       ClosedAt    *time.Time
+       MergedAt    *time.Time
+       UpdatedAt   time.Time
+       CreatedAt   time.Time
+       MergeCommit *struct {
+               Oid string
+       }
+       HeadRefName string
+       HeadRefOid  string
+       BaseRefName string
+       BaseRefOid  string
+       Commits     struct {
+               PageInfo   *helper.GraphqlQueryPageInfo
+               Nodes      []GraphqlQueryCommit `graphql:"nodes"`
+               TotalCount graphql.Int
+       } `graphql:"commits(first: 100)"`
+       Reviews struct {
+               TotalCount graphql.Int
+               Nodes      []GraphqlQueryReview `graphql:"nodes"`
+       } `graphql:"reviews(first: 100)"`
+}
+
+type GraphqlQueryReview struct {
+       Body       string
+       Author     *GraphqlInlineAccountQuery
+       State      string `json:"state"`
+       DatabaseId int    `json:"databaseId"`
+       Commit     struct {
+               Oid string
+       }
+       SubmittedAt *time.Time `json:"submittedAt"`
+}
+
+type GraphqlQueryCommit struct {
+       Commit struct {
+               Oid     string
+               Message string
+               Author  struct {
+                       Name  string
+                       Email string
+                       Date  time.Time
+                       User  *GraphqlInlineAccountQuery
+               }
+               Committer struct {
+                       Date  time.Time
+                       Email string
+                       Name  string
+               }
+       }
+       Url string
+}
+
+var CollectPrMeta = core.SubTaskMeta{
+       Name:             "CollectPr",
+       EntryPoint:       CollectPr,
+       EnabledByDefault: true,
+       Description:      "Collect Pr data from GithubGraphql api",
+}
+
+var _ core.SubTaskEntryPoint = CollectPr
+
+func CollectPr(taskCtx core.SubTaskContext) error {
+       data := taskCtx.GetData().(*githubTasks.GithubTaskData)
+       config := data.Options.TransformationRules
+       var labelTypeRegex *regexp.Regexp
+       var labelComponentRegex *regexp.Regexp
+       var prType = config.PrType
+       var err error
+       if len(prType) > 0 {
+               labelTypeRegex, err = regexp.Compile(prType)
+               if err != nil {
+                       return fmt.Errorf("regexp Compile prType failed:[%s] 
stack:[%s]", err.Error(), debug.Stack())
+               }
+       }
+       var prComponent = config.PrComponent
+       if len(prComponent) > 0 {
+               labelComponentRegex, err = regexp.Compile(prComponent)
+               if err != nil {
+                       return fmt.Errorf("regexp Compile prComponent 
failed:[%s] stack:[%s]", err.Error(), debug.Stack())
+               }
+       }
+
+       collector, err := 
helper.NewGraphqlCollector(helper.GraphqlCollectorArgs{
+               RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+                       Ctx: taskCtx,
+                       Params: githubTasks.GithubApiParams{
+                               ConnectionId: data.Options.ConnectionId,
+                               Owner:        data.Options.Owner,
+                               Repo:         data.Options.Repo,
+                       },
+                       Table: RAW_PRS_TABLE,
+               },
+               GraphqlClient: data.GraphqlClient,
+               PageSize:      100,
+               /*
+                       (Optional) Return query string for request, or you can 
plug them into UrlTemplate directly
+               */
+               BuildQuery: func(reqData *helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
+                       query := &GraphqlQueryPrWrapper{}
+                       variables := map[string]interface{}{
+                               "pageSize":   graphql.Int(reqData.Pager.Size),
+                               "skipCursor": 
(*graphql.String)(reqData.Pager.SkipCursor),
+                               "owner":      
graphql.String(data.Options.Owner),
+                               "name":       graphql.String(data.Options.Repo),
+                       }
+                       return query, variables, nil
+               },
+               GetPageInfo: func(iQuery interface{}, args 
*helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) {
+                       query := iQuery.(*GraphqlQueryPrWrapper)
+                       return query.Repository.PullRequests.PageInfo, nil
+               },
+               ResponseParser: func(iQuery interface{}, variables 
map[string]interface{}) ([]interface{}, error) {
+                       query := iQuery.(*GraphqlQueryPrWrapper)
+                       prs := query.Repository.PullRequests.Prs
+
+                       results := make([]interface{}, 0, 1)
+                       for _, rawL := range prs {
+                               //If this is a pr, ignore
+                               githubPr, err := convertGithubPullRequest(rawL, 
data.Options.ConnectionId, data.Repo.GithubId)
+                               if err != nil {
+                                       return nil, err
+                               }
+                               if rawL.Author != nil {
+                                       githubUser, err := 
convertGraphqlPreAccount(*rawL.Author, data.Repo.GithubId, 
data.Options.ConnectionId)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       results = append(results, githubUser)
+                               }
+                               for _, label := range rawL.Labels.Nodes {
+                                       results = append(results, 
&models.GithubPrLabel{
+                                               ConnectionId: 
data.Options.ConnectionId,
+                                               PullId:       githubPr.GithubId,
+                                               LabelName:    label.Name,
+                                       })
+                                       // if pr.Type has not been set and 
prType is set in .env, process the below
+                                       if labelTypeRegex != nil {
+                                               groups := 
labelTypeRegex.FindStringSubmatch(label.Name)
+                                               if len(groups) > 0 {
+                                                       githubPr.Type = 
groups[1]
+                                               }
+                                       }
+
+                                       // if pr.Component has not been set and 
prComponent is set in .env, process
+                                       if labelComponentRegex != nil {
+                                               groups := 
labelComponentRegex.FindStringSubmatch(label.Name)
+                                               if len(groups) > 0 {
+                                                       githubPr.Component = 
groups[1]
+                                               }
+                                       }
+                               }
+                               results = append(results, githubPr)
+
+                               for _, apiPullRequestReview := range 
rawL.Reviews.Nodes {
+                                       if apiPullRequestReview.State != 
"PENDING" {
+                                               githubReviewer := 
&models.GithubReviewer{
+                                                       ConnectionId:  
data.Options.ConnectionId,
+                                                       GithubId:      
apiPullRequestReview.Author.Id,
+                                                       Login:         
apiPullRequestReview.Author.Login,
+                                                       PullRequestId: 
githubPr.GithubId,
+                                               }
+
+                                               githubPrReview := 
&models.GithubPrReview{
+                                                       ConnectionId:   
data.Options.ConnectionId,
+                                                       GithubId:       
apiPullRequestReview.DatabaseId,
+                                                       Body:           
apiPullRequestReview.Body,
+                                                       State:          
apiPullRequestReview.State,
+                                                       CommitSha:      
apiPullRequestReview.Commit.Oid,
+                                                       GithubSubmitAt: 
apiPullRequestReview.SubmittedAt,
+
+                                                       PullRequestId:  
githubPr.GithubId,
+                                                       AuthorUsername: 
apiPullRequestReview.Author.Login,
+                                                       AuthorUserId:   
apiPullRequestReview.Author.Id,
+                                               }
+
+                                               results = append(results, 
githubReviewer)
+                                               results = append(results, 
githubPrReview)
+                                               githubUser, err := 
convertGraphqlPreAccount(*apiPullRequestReview.Author, data.Repo.GithubId, 
data.Options.ConnectionId)
+                                               if err != nil {
+                                                       return nil, err
+                                               }
+                                               results = append(results, 
githubUser)
+                                       }
+                               }
+
+                               for _, apiPullRequestCommit := range 
rawL.Commits.Nodes {
+                                       githubCommit, err := 
convertPullRequestCommit(apiPullRequestCommit)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       results = append(results, githubCommit)
+
+                                       githubPullRequestCommit := 
&models.GithubPrCommit{
+                                               ConnectionId:  
data.Options.ConnectionId,
+                                               CommitSha:     
apiPullRequestCommit.Commit.Oid,
+                                               PullRequestId: 
githubPr.GithubId,
+                                       }
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       results = append(results, 
githubPullRequestCommit)
+
+                                       if 
apiPullRequestCommit.Commit.Author.User != nil {
+                                               githubUser, err := 
convertGraphqlPreAccount(*apiPullRequestCommit.Commit.Author.User, 
data.Repo.GithubId, data.Options.ConnectionId)
+                                               if err != nil {
+                                                       return nil, err
+                                               }
+                                               results = append(results, 
githubUser)
+                                       }
+                               }
+
+                       }
+                       return results, nil
+               },
+       })
+
+       if err != nil {
+               return err
+       }
+
+       return collector.Execute()
+}
+
+func convertGithubPullRequest(pull GraphqlQueryPr, connId uint64, repoId int) 
(*models.GithubPullRequest, error) {
+       githubPull := &models.GithubPullRequest{
+               ConnectionId:    connId,
+               GithubId:        pull.DatabaseId,
+               RepoId:          repoId,
+               Number:          pull.Number,
+               State:           pull.State,
+               Title:           pull.Title,
+               Url:             pull.Url,
+               GithubCreatedAt: pull.CreatedAt,
+               GithubUpdatedAt: pull.UpdatedAt,
+               ClosedAt:        pull.ClosedAt,
+               MergedAt:        pull.MergedAt,
+               Body:            pull.Body,
+               BaseRef:         pull.BaseRefName,
+               BaseCommitSha:   pull.BaseRefOid,
+               HeadRef:         pull.HeadRefName,
+               HeadCommitSha:   pull.HeadRefOid,
+       }
+       if pull.MergeCommit != nil {
+               githubPull.MergeCommitSha = pull.MergeCommit.Oid
+       }
+       if pull.Author != nil {
+               githubPull.AuthorName = pull.Author.Login
+               githubPull.AuthorId = pull.Author.Id
+       }
+       return githubPull, nil
+}
+
+func convertPullRequestCommit(prCommit GraphqlQueryCommit) 
(*models.GithubCommit, error) {
+       githubCommit := &models.GithubCommit{
+               Sha:            prCommit.Commit.Oid,
+               Message:        prCommit.Commit.Message,
+               AuthorName:     prCommit.Commit.Author.Name,
+               AuthorEmail:    prCommit.Commit.Author.Email,
+               AuthoredDate:   prCommit.Commit.Author.Date,
+               CommitterName:  prCommit.Commit.Committer.Name,
+               CommitterEmail: prCommit.Commit.Committer.Email,
+               CommittedDate:  prCommit.Commit.Committer.Date,
+               Url:            prCommit.Url,
+       }
+       if prCommit.Commit.Author.User != nil {
+               githubCommit.AuthorId = prCommit.Commit.Author.User.Id
+       }
+       return githubCommit, nil
+}
diff --git a/plugins/github_graphql/tasks/repo_collector.go 
b/plugins/github_graphql/tasks/repo_collector.go
new file mode 100755
index 00000000..18e1f62b
--- /dev/null
+++ b/plugins/github_graphql/tasks/repo_collector.go
@@ -0,0 +1,136 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/github/models"
+       githubTasks "github.com/apache/incubator-devlake/plugins/github/tasks"
+       "github.com/apache/incubator-devlake/plugins/helper"
+       "github.com/merico-dev/graphql"
+       "time"
+)
+
+const RAW_REPO_TABLE = "github_graphql_repo"
+
+var _ core.SubTaskEntryPoint = CollectRepo
+
+type GraphqlQueryRepo struct {
+       RateLimit struct {
+               Cost int
+       }
+       Repository struct {
+               Name      string `graphql:"name"`
+               GithubId  int    `graphql:"databaseId"`
+               HTMLUrl   string `graphql:"url"`
+               Languages struct {
+                       Nodes []struct {
+                               Name string
+                       }
+               } `graphql:"languages(first: 1)"`
+               Description string `graphql:"description"`
+               Owner       GraphqlInlineAccountQuery
+               CreatedDate time.Time  `graphql:"createdAt"`
+               UpdatedDate *time.Time `graphql:"updatedAt"`
+               Parent      *struct {
+                       GithubId int    `graphql:"databaseId"`
+                       HTMLUrl  string `graphql:"url"`
+               }
+       } `graphql:"repository(owner: $owner, name: $name)"`
+}
+
+var CollectRepoMeta = core.SubTaskMeta{
+       Name:             "CollectRepo",
+       EntryPoint:       CollectRepo,
+       EnabledByDefault: true,
+       Description:      "Collect Repo data from GithubGraphql api",
+}
+
+func CollectRepo(taskCtx core.SubTaskContext) error {
+       data := taskCtx.GetData().(*githubTasks.GithubTaskData)
+       db := taskCtx.GetDal()
+
+       collector, err := 
helper.NewGraphqlCollector(helper.GraphqlCollectorArgs{
+               RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+                       Ctx: taskCtx,
+                       /*
+                               This struct will be JSONEncoded and stored into 
database along with raw data itself, to identity minimal
+                               set of data to be process, for example, we 
process JiraIssues by Board
+                       */
+                       Params: githubTasks.GithubApiParams{
+                               ConnectionId: data.Options.ConnectionId,
+                               Owner:        data.Options.Owner,
+                               Repo:         data.Options.Repo,
+                       },
+                       Table: RAW_REPO_TABLE,
+               },
+               GraphqlClient: data.GraphqlClient,
+               /*
+                       (Optional) Return query string for request, or you can 
plug them into UrlTemplate directly
+               */
+               BuildQuery: func(reqData *helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
+                       query := &GraphqlQueryRepo{}
+                       variables := map[string]interface{}{
+                               "owner": graphql.String(data.Options.Owner),
+                               "name":  graphql.String(data.Options.Repo),
+                       }
+                       return query, variables, nil
+               },
+               ResponseParser: func(iQuery interface{}, variables 
map[string]interface{}) ([]interface{}, error) {
+                       query := iQuery.(*GraphqlQueryRepo)
+                       repository := query.Repository
+                       results := make([]interface{}, 0, 1)
+                       githubRepository := &models.GithubRepo{
+                               ConnectionId: data.Options.ConnectionId,
+                               GithubId:     repository.GithubId,
+                               Name:         repository.Name,
+                               HTMLUrl:      repository.HTMLUrl,
+                               Description:  repository.Description,
+                               OwnerId:      repository.Owner.Id,
+                               OwnerLogin:   repository.Owner.Login,
+                               Language:     
repository.Languages.Nodes[0].Name,
+                               CreatedDate:  repository.CreatedDate,
+                               UpdatedDate:  repository.UpdatedDate,
+                       }
+                       data.Repo = githubRepository
+
+                       if repository.Parent != nil {
+                               githubRepository.ParentGithubId = 
repository.Parent.GithubId
+                               githubRepository.ParentHTMLUrl = 
repository.Parent.HTMLUrl
+                       }
+                       err := db.CreateOrUpdate(githubRepository)
+                       if err != nil {
+                               return nil, err
+                       }
+                       results = append(results, githubRepository)
+
+                       githubUser, err := 
convertGraphqlPreAccount(repository.Owner, data.Repo.GithubId, 
data.Options.ConnectionId)
+                       if err != nil {
+                               return nil, err
+                       }
+                       results = append(results, githubUser)
+                       return results, nil
+               },
+       })
+
+       if err != nil {
+               return err
+       }
+
+       return collector.Execute()
+}
diff --git a/plugins/helper/graphql_async_client.go 
b/plugins/helper/graphql_async_client.go
new file mode 100644
index 00000000..1dbb4484
--- /dev/null
+++ b/plugins/helper/graphql_async_client.go
@@ -0,0 +1,165 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package helper
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/merico-dev/graphql"
+       "sync"
+       "time"
+)
+
+// GraphqlAsyncClient send graphql one by one
+type GraphqlAsyncClient struct {
+       ctx          context.Context
+       cancel       context.CancelFunc
+       client       *graphql.Client
+       logger       core.Logger
+       mu           sync.Mutex
+       waitGroup    sync.WaitGroup
+       workerErrors []error
+
+       rateExhaustCond  *sync.Cond
+       rateRemaining    int
+       getRateRemaining func(context.Context, *graphql.Client, core.Logger) 
(rateRemaining int, resetAt *time.Time, err error)
+       getRateCost      func(q interface{}) int
+}
+
+// CreateAsyncGraphqlClient creates a new GraphqlAsyncClient
+func CreateAsyncGraphqlClient(
+       ctx context.Context,
+       graphqlClient *graphql.Client,
+       logger core.Logger,
+       getRateRemaining func(context.Context, *graphql.Client, core.Logger) 
(rateRemaining int, resetAt *time.Time, err error),
+) *GraphqlAsyncClient {
+       ctxWithCancel, cancel := context.WithCancel(ctx)
+       graphqlAsyncClient := &GraphqlAsyncClient{
+               ctx:              ctxWithCancel,
+               cancel:           cancel,
+               client:           graphqlClient,
+               logger:           logger,
+               rateExhaustCond:  sync.NewCond(&sync.Mutex{}),
+               rateRemaining:    0,
+               getRateRemaining: getRateRemaining,
+       }
+       if getRateRemaining != nil {
+               rateRemaining, resetAt, err := getRateRemaining(ctx, 
graphqlClient, logger)
+               if err != nil {
+                       panic(err)
+               }
+               graphqlAsyncClient.updateRateRemaining(rateRemaining, resetAt)
+       }
+       return graphqlAsyncClient
+}
+
+// updateRateRemaining call getRateRemaining to update rateRemaining 
periodically
+func (apiClient *GraphqlAsyncClient) updateRateRemaining(rateRemaining int, 
resetAt *time.Time) {
+       apiClient.rateRemaining = rateRemaining
+       if rateRemaining > 0 {
+               apiClient.rateExhaustCond.Signal()
+       }
+       go func() {
+               nextDuring := 3 * time.Minute
+               if resetAt != nil && resetAt.After(time.Now()) {
+                       nextDuring = resetAt.Sub(time.Now())
+               }
+               <-time.After(nextDuring)
+               newRateRemaining, newResetAt, err := 
apiClient.getRateRemaining(apiClient.ctx, apiClient.client, apiClient.logger)
+               if err != nil {
+                       panic(err)
+               }
+               apiClient.updateRateRemaining(newRateRemaining, newResetAt)
+       }()
+}
+
+// SetGetRateCost to calculate how many rate cost
+// if not set, all query just cost 1
+func (apiClient *GraphqlAsyncClient) SetGetRateCost(getRateCost func(q 
interface{}) int) {
+       apiClient.getRateCost = getRateCost
+}
+
+// Query send a graphql request when get lock
+func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables 
map[string]interface{}) error {
+       apiClient.waitGroup.Add(1)
+       defer apiClient.waitGroup.Done()
+       apiClient.mu.Lock()
+       defer apiClient.mu.Unlock()
+
+       apiClient.rateExhaustCond.L.Lock()
+       defer apiClient.rateExhaustCond.L.Unlock()
+       for apiClient.rateRemaining <= 0 {
+               apiClient.logger.Info(`rate limit remaining exhausted, waiting 
for next period.`)
+               apiClient.rateExhaustCond.Wait()
+       }
+       select {
+       case <-apiClient.ctx.Done():
+               return nil
+       default:
+               err := apiClient.client.Query(apiClient.ctx, q, variables)
+               if err != nil {
+                       return err
+               }
+               cost := 1
+               if apiClient.getRateCost != nil {
+                       cost = apiClient.getRateCost(q)
+               }
+               apiClient.rateRemaining -= cost
+               apiClient.logger.Debug(`query cost %d in %v`, cost, variables)
+               return nil
+       }
+}
+
+// NextTick to return the NextTick of scheduler
+func (apiClient *GraphqlAsyncClient) NextTick(task func() error) {
+       // to make sure task will be enqueued
+       apiClient.waitGroup.Add(1)
+       go func() {
+               defer apiClient.waitGroup.Done()
+               select {
+               case <-apiClient.ctx.Done():
+                       return
+               default:
+                       go func() {
+                               apiClient.checkError(task())
+                       }()
+               }
+       }()
+}
+
+// WaitAsync blocks until all async requests were done
+func (apiClient *GraphqlAsyncClient) Wait() error {
+       apiClient.waitGroup.Wait()
+       if len(apiClient.workerErrors) > 0 {
+               return fmt.Errorf("%s", apiClient.workerErrors)
+       }
+       return nil
+}
+
+func (apiClient *GraphqlAsyncClient) checkError(err error) {
+       if err == nil {
+               return
+       }
+       apiClient.workerErrors = append(apiClient.workerErrors, err)
+}
+
+// HasError return if any error occurred
+func (apiClient *GraphqlAsyncClient) HasError() bool {
+       return len(apiClient.workerErrors) > 0
+}
diff --git a/plugins/helper/graphql_collector.go 
b/plugins/helper/graphql_collector.go
new file mode 100644
index 00000000..ed69e0cc
--- /dev/null
+++ b/plugins/helper/graphql_collector.go
@@ -0,0 +1,309 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package helper
+
+import (
+       "encoding/json"
+       "fmt"
+       "github.com/apache/incubator-devlake/models/common"
+       "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
+       "github.com/merico-dev/graphql"
+       "net/http"
+       "reflect"
+)
+
+// CursorPager contains pagination information for a graphql request
+type CursorPager struct {
+       SkipCursor *string
+       Size       int
+}
+
+// GraphqlRequestData is the input of `UrlTemplate` `BuildQuery` and `Header`, 
so we can generate them dynamically
+type GraphqlRequestData struct {
+       Pager     *CursorPager
+       Params    interface{}
+       Input     interface{}
+       InputJSON []byte
+}
+
+type GraphqlQueryPageInfo struct {
+       EndCursor   string `json:"endCursor"`
+       HasNextPage bool   `json:"hasNextPage"`
+}
+
+type GraphqlAsyncResponseHandler func(res *http.Response) error
+
+type GraphqlCollectorArgs struct {
+       RawDataSubTaskArgs
+       // BuildQuery would be sent out as part of the request URL
+       BuildQuery func(reqData *GraphqlRequestData) (query interface{}, 
variables map[string]interface{}, err error)
+       // PageSize tells ApiCollector the page size
+       PageSize int
+       // GraphqlClient is a asynchronize api request client with qps
+       GraphqlClient *GraphqlAsyncClient
+       // Input helps us collect data based on previous collected data, like 
collecting changelogs based on jira
+       // issue ids
+       Input Iterator
+       // how many times fetched from input, default 1 means only fetch once
+       // NOTICE: InputStep=1 will fill value as item and InputStep>1 will 
fill value as []item
+       InputStep      int
+       IgnoreQueryErr bool
+       // GetPageInfo is to tell `GraphqlCollector` is page information
+       GetPageInfo    func(query interface{}, args *GraphqlCollectorArgs) 
(*GraphqlQueryPageInfo, error)
+       BatchSize      int
+       ResponseParser func(query interface{}, variables 
map[string]interface{}) ([]interface{}, error)
+}
+
+type GraphqlCollector struct {
+       *RawDataSubTask
+       args *GraphqlCollectorArgs
+}
+
+// NewGraphqlCollector allocates a new GraphqlCollector with the given args.
+// GraphqlCollector can help us collect data from api with ease, pass in a 
AsyncGraphqlClient and tell it which part
+// of response we want to save, GraphqlCollector will collect them from remote 
server and store them into database.
+func NewGraphqlCollector(args GraphqlCollectorArgs) (*GraphqlCollector, error) 
{
+       // process args
+       rawDataSubTask, err := newRawDataSubTask(args.RawDataSubTaskArgs)
+       if err != nil {
+               return nil, err
+       }
+       if err != nil {
+               return nil, fmt.Errorf("Failed to compile UrlTemplate: %w", err)
+       }
+       if args.GraphqlClient == nil {
+               return nil, fmt.Errorf("ApiClient is required")
+       }
+       if args.ResponseParser == nil {
+               return nil, fmt.Errorf("ResponseParser is required")
+       }
+       apicllector := &GraphqlCollector{
+               RawDataSubTask: rawDataSubTask,
+               args:           &args,
+       }
+       if args.BatchSize == 0 {
+               args.BatchSize = 500
+       }
+       if args.InputStep == 0 {
+               args.InputStep = 1
+       }
+       //if args.AfterResponse != nil {
+       //      apicllector.SetAfterResponse(args.AfterResponse)
+       //} else {
+       //      apicllector.SetAfterResponse(func(res *http.Response) error {
+       //              if res.StatusCode == http.StatusUnauthorized {
+       //                      return fmt.Errorf("authentication failed, 
please check your AccessToken")
+       //              }
+       //              return nil
+       //      })
+       //}
+       return apicllector, nil
+}
+
+// Start collection
+func (collector *GraphqlCollector) Execute() error {
+       logger := collector.args.Ctx.GetLogger()
+       logger.Info("start graphql collection")
+
+       // make sure table is created
+       db := collector.args.Ctx.GetDal()
+       err := db.AutoMigrate(&RawData{}, dal.From(collector.table))
+       if err != nil {
+               return err
+       }
+
+       // flush data if not incremental collection
+       err = db.Delete(&RawData{}, dal.From(collector.table), 
dal.Where("params = ?", collector.params))
+       if err != nil {
+               return err
+       }
+       divider := NewBatchSaveDivider(collector.args.Ctx, 
collector.args.BatchSize, collector.table, collector.params)
+
+       collector.args.Ctx.SetProgress(0, -1)
+       if collector.args.Input != nil {
+               iterator := collector.args.Input
+               defer iterator.Close()
+               apiClient := collector.args.GraphqlClient
+               for iterator.HasNext() && !apiClient.HasError() {
+                       if collector.args.InputStep == 1 {
+                               input, err := iterator.Fetch()
+                               if err != nil {
+                                       break
+                               }
+                               collector.exec(divider, input)
+                       } else {
+                               var inputs []interface{}
+                               for i := 0; i < collector.args.InputStep && 
iterator.HasNext(); i++ {
+                                       input, err := iterator.Fetch()
+                                       if err != nil {
+                                               break
+                                       }
+                                       inputs = append(inputs, input)
+                               }
+                               collector.exec(divider, inputs)
+                       }
+               }
+       } else {
+               // or we just did it once
+               collector.exec(divider, nil)
+       }
+
+       logger.Debug("wait for all async api to finished")
+
+       err = collector.args.GraphqlClient.Wait()
+       if err != nil {
+               logger.Info("end api collection error: %w", err)
+       } else {
+               logger.Info("end api collection without error")
+       }
+       err = divider.Close()
+
+       return err
+}
+
+func (collector *GraphqlCollector) exec(divider *BatchSaveDivider, input 
interface{}) {
+       inputJson, err := json.Marshal(input)
+       if err != nil {
+               panic(err)
+       }
+       reqData := new(GraphqlRequestData)
+       reqData.Input = input
+       reqData.InputJSON = inputJson
+       reqData.Pager = &CursorPager{
+               SkipCursor: nil,
+               Size:       collector.args.PageSize,
+       }
+       if collector.args.GetPageInfo != nil {
+               collector.fetchOneByOne(divider, reqData)
+       } else {
+               collector.fetchAsync(divider, reqData, nil)
+       }
+}
+
+// fetchPagesDetermined fetches data of all pages for APIs that return paging 
information
+func (collector *GraphqlCollector) fetchOneByOne(divider *BatchSaveDivider, 
reqData *GraphqlRequestData) {
+       // fetch first page
+       var fetchNextPage func(query interface{}) error
+       fetchNextPage = func(query interface{}) error {
+               pageInfo, err := collector.args.GetPageInfo(query, 
collector.args)
+               if err != nil {
+                       return fmt.Errorf("fetchPagesDetermined get totalPages 
faileds: %s", err.Error())
+               }
+               if pageInfo.HasNextPage {
+                       collector.args.GraphqlClient.NextTick(func() error {
+                               reqDataTemp := &GraphqlRequestData{
+                                       Pager: &CursorPager{
+                                               SkipCursor: &pageInfo.EndCursor,
+                                               Size:       
collector.args.PageSize,
+                                       },
+                                       Input:     reqData.Input,
+                                       InputJSON: reqData.InputJSON,
+                               }
+                               collector.fetchAsync(divider, reqDataTemp, 
fetchNextPage)
+                               return nil
+                       })
+               }
+               return nil
+       }
+       collector.fetchAsync(divider, reqData, fetchNextPage)
+}
+
+func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, 
reqData *GraphqlRequestData, handler func(query interface{}) error) {
+       if reqData.Pager == nil {
+               reqData.Pager = &CursorPager{
+                       SkipCursor: nil,
+                       Size:       collector.args.PageSize,
+               }
+       }
+       query, variables, err := collector.args.BuildQuery(reqData)
+       if err != nil {
+               panic(err)
+       }
+
+       logger := collector.args.Ctx.GetLogger()
+       err = collector.args.GraphqlClient.Query(query, variables)
+       if err != nil {
+               if collector.args.IgnoreQueryErr {
+                       logger.Error("fetchAsync fail and got:", err)
+                       return
+               } else {
+                       panic(err)
+               }
+       }
+       defer logger.Debug("fetchAsync >>> done for %v %v", query, variables)
+
+       paramsBytes, err := json.Marshal(query)
+       if err != nil {
+               panic(err)
+       }
+       db := collector.args.Ctx.GetDal()
+       queryStr, _ := graphql.ConstructQuery(query, variables)
+       row := &RawData{
+               Params: collector.params,
+               Data:   paramsBytes,
+               Url:    queryStr,
+               Input:  reqData.InputJSON,
+       }
+       err = db.Create(row, dal.From(collector.table))
+       if err != nil {
+               panic(err)
+       }
+
+       results, err := collector.args.ResponseParser(query, variables)
+       if err != nil {
+               panic(err)
+       }
+
+       RAW_DATA_ORIGIN := "RawDataOrigin"
+       // batch save divider
+       for _, result := range results {
+               // get the batch operator for the specific type
+               batch, err := divider.ForType(reflect.TypeOf(result))
+               if err != nil {
+                       panic(err)
+               }
+               // set raw data origin field
+               origin := 
reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
+               if origin.IsValid() {
+                       origin.Set(reflect.ValueOf(common.RawDataOrigin{
+                               RawDataTable:  collector.table,
+                               RawDataId:     row.ID,
+                               RawDataParams: row.Params,
+                       }))
+               }
+               // records get saved into db when slots were max outed
+               err = batch.Add(result)
+               if err != nil {
+                       panic(err)
+               }
+               collector.args.Ctx.IncProgress(1)
+       }
+       if err != nil {
+               panic(err)
+       }
+       collector.args.Ctx.IncProgress(1)
+       if handler != nil {
+               err = handler(query)
+               if err != nil {
+                       panic(err)
+               }
+       }
+}
+
+var _ core.SubTask = (*ApiCollector)(nil)

Reply via email to