This is an automated email from the ASF dual-hosted git repository.
zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 3ebab5a2 feat: graphql POC in github (#2619)
3ebab5a2 is described below
commit 3ebab5a21ae996204a52d1a52f6a864e764250b9
Author: likyh <[email protected]>
AuthorDate: Thu Aug 18 16:55:10 2022 +0800
feat: graphql POC in github (#2619)
* feat: create the github graphql plugin and create some crude helpers
* fix: fix some bugs
* feat: request 100 users pre query
* feat: request events by rest
* fix: record query as url in raw layer
* fix: skip the account which type!='User'
* feat: copy old collector
* fix: ignore collect account not found error
* fix: change TaskData to avoid duplicated collector
Co-authored-by: linyh <[email protected]>
---
go.mod | 8 +-
go.sum | 16 +
plugins/github/tasks/account_collector.go | 1 -
plugins/github/tasks/task_data.go | 9 +-
plugins/github_graphql/plugin_main.go | 171 +++++++++++
plugins/github_graphql/tasks/account_collector.go | 166 ++++++++++
.../tasks/account_graphql_pre_extractor.go | 47 +++
.../tasks/account_rest_pre_extractor.go | 56 ++++
plugins/github_graphql/tasks/issue_collector.go | 238 +++++++++++++++
plugins/github_graphql/tasks/pr_collector.go | 333 +++++++++++++++++++++
plugins/github_graphql/tasks/repo_collector.go | 136 +++++++++
plugins/helper/graphql_async_client.go | 165 ++++++++++
plugins/helper/graphql_collector.go | 309 +++++++++++++++++++
13 files changed, 1648 insertions(+), 7 deletions(-)
diff --git a/go.mod b/go.mod
index 9c33f307..7e042d66 100644
--- a/go.mod
+++ b/go.mod
@@ -26,6 +26,7 @@ require (
go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a
go.temporal.io/sdk v1.14.0
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519
+ golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
gorm.io/datatypes v1.0.1
gorm.io/driver/mysql v1.3.3
@@ -85,6 +86,7 @@ require (
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mattn/go-isatty v0.0.13 // indirect
github.com/mattn/go-sqlite3 v1.14.6 // indirect
+ github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2 //
indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd //
indirect
@@ -97,6 +99,7 @@ require (
github.com/robfig/cron v1.2.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sergi/go-diff v1.1.0 // indirect
+ github.com/shurcooL/graphql v0.0.0-20220606043923-3cf50f8a0a29 //
indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.6-0.20200504143853-81378bbcd8a1 // indirect
@@ -105,12 +108,13 @@ require (
github.com/ugorji/go/codec v1.2.6 // indirect
github.com/xanzy/ssh-agent v0.3.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
- golang.org/x/net v0.0.0-20220708220712-1185a9018129 // indirect
- golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e // indirect
+ golang.org/x/net v0.0.0-20220728211354-c7608f3a8462 // indirect
+ golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
golang.org/x/tools v0.1.11 // indirect
+ google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf //
indirect
google.golang.org/grpc v1.44.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
diff --git a/go.sum b/go.sum
index faf3b124..e0931d46 100644
--- a/go.sum
+++ b/go.sum
@@ -432,6 +432,12 @@ github.com/mattn/go-isatty v0.0.13/go.mod
h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
github.com/mattn/go-sqlite3 v1.14.5/go.mod
h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI=
github.com/mattn/go-sqlite3 v1.14.6
h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod
h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
+github.com/merico-dev/graphql v0.0.0-20220606043923-3cf50f8a0a29
h1:jEarKDWDyd59SKG2AoH/3JYqRFfl7RE2uEzGJRpCl3Q=
+github.com/merico-dev/graphql v0.0.0-20220606043923-3cf50f8a0a29/go.mod
h1:q+XAHrrzQfwrEaGgnl/VduY0Gd9FqSnMQEpKq101TxE=
+github.com/merico-dev/graphql v0.0.0-20220803162350-42fdc19ba54c
h1:7l5zeXBLMWafzHD0ElpaNVTVhNzcQz7Urkw9WebNt5o=
+github.com/merico-dev/graphql v0.0.0-20220803162350-42fdc19ba54c/go.mod
h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ=
+github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2
h1:sOXuZIg3OwBnvJFfIuO8wegiLpeDCOSVvk2dsbjurd8=
+github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2/go.mod
h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ=
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI=
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod
h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/miekg/dns v1.0.14/go.mod
h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
@@ -510,6 +516,8 @@ github.com/shopspring/decimal
v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9Nz
github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod
h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shopspring/decimal v1.2.0
h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
github.com/shopspring/decimal v1.2.0/go.mod
h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
+github.com/shurcooL/graphql v0.0.0-20220606043923-3cf50f8a0a29
h1:B1PEwpArrNp4dkQrfxh/abbBAOZBVp0ds+fBEOUOqOc=
+github.com/shurcooL/graphql v0.0.0-20220606043923-3cf50f8a0a29/go.mod
h1:AuYgA5Kyo4c7HfUmvRGs/6rGlMMV/6B1bVnB9JxJEEg=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod
h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.4.1/go.mod
h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2/go.mod
h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
@@ -706,6 +714,10 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod
h1:CfG3xpIq0wQ8r1q4Su
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220708220712-1185a9018129
h1:vucSRfWwTsoXro7P+3Cjlr6flUMtzCwzlvkxEQtHHB0=
golang.org/x/net v0.0.0-20220708220712-1185a9018129/go.mod
h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.0.0-20220728211354-c7608f3a8462
h1:UreQrH7DbFXSi9ZFox6FNT3WBooWmdANpU+IfkT1T4I=
+golang.org/x/net v0.0.0-20220728211354-c7608f3a8462/go.mod
h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
+golang.org/x/net v0.0.0-20220802222814-0bcc04d9c69b
h1:3ogNYyK4oIQdIKzTu68hQrr4iuVxF3AxKl9Aj/eDrw0=
+golang.org/x/net v0.0.0-20220802222814-0bcc04d9c69b/go.mod
h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -717,6 +729,7 @@ golang.org/x/oauth2
v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod
h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93/go.mod
h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod
h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
+golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602
h1:0Ja1LBD+yisY6RWM/BH7TJVXWsSjs2VwBSmvSX4HdBc=
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602/go.mod
h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -799,6 +812,8 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod
h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220222200937-f2425489ef4c/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e
h1:NHvCuwuS43lGnYhten69ZWqi2QOj/CiDNcKbVqwVoew=
golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10
h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg=
+golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod
h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
@@ -913,6 +928,7 @@ google.golang.org/appengine v1.5.0/go.mod
h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/appengine v1.6.1/go.mod
h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
google.golang.org/appengine v1.6.5/go.mod
h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/appengine v1.6.6/go.mod
h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
+google.golang.org/appengine v1.6.7
h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
google.golang.org/appengine v1.6.7/go.mod
h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod
h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod
h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
diff --git a/plugins/github/tasks/account_collector.go
b/plugins/github/tasks/account_collector.go
index fbeaa279..829a4917 100644
--- a/plugins/github/tasks/account_collector.go
+++ b/plugins/github/tasks/account_collector.go
@@ -76,7 +76,6 @@ func CollectAccounts(taskCtx core.SubTaskContext) error {
},
AfterResponse: func(res *http.Response) error {
if res.StatusCode == http.StatusNotFound {
- println(res.Request.URL)
return helper.ErrIgnoreAndContinue
}
return nil
diff --git a/plugins/github/tasks/task_data.go
b/plugins/github/tasks/task_data.go
index 1866cdca..14b5980e 100644
--- a/plugins/github/tasks/task_data.go
+++ b/plugins/github/tasks/task_data.go
@@ -36,10 +36,11 @@ type GithubOptions struct {
}
type GithubTaskData struct {
- Options *GithubOptions
- ApiClient *helper.ApiAsyncClient
- Since *time.Time
- Repo *models.GithubRepo
+ Options *GithubOptions
+ ApiClient *helper.ApiAsyncClient
+ GraphqlClient *helper.GraphqlAsyncClient
+ Since *time.Time
+ Repo *models.GithubRepo
}
func DecodeAndValidateTaskOptions(options map[string]interface{})
(*GithubOptions, error) {
diff --git a/plugins/github_graphql/plugin_main.go
b/plugins/github_graphql/plugin_main.go
new file mode 100755
index 00000000..acd6b20e
--- /dev/null
+++ b/plugins/github_graphql/plugin_main.go
@@ -0,0 +1,171 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/github/models"
+ githubTasks "github.com/apache/incubator-devlake/plugins/github/tasks"
+ "github.com/apache/incubator-devlake/plugins/github_graphql/tasks"
+ "github.com/apache/incubator-devlake/plugins/helper"
+ "github.com/apache/incubator-devlake/runner"
+ "github.com/merico-dev/graphql"
+ "github.com/mitchellh/mapstructure"
+ "github.com/spf13/cobra"
+ "github.com/spf13/viper"
+ "golang.org/x/oauth2"
+ "gorm.io/gorm"
+ "reflect"
+ "strings"
+ "time"
+)
+
+// make sure interface is implemented
+var _ core.PluginMeta = (*GithubGraphql)(nil)
+var _ core.PluginInit = (*GithubGraphql)(nil)
+var _ core.PluginTask = (*GithubGraphql)(nil)
+var _ core.PluginApi = (*GithubGraphql)(nil)
+
+// Export a variable named PluginEntry for Framework to search and load
+var PluginEntry GithubGraphql //nolint
+
+type GithubGraphql struct{}
+
+func (plugin GithubGraphql) Description() string {
+ return "collect some GithubGraphql data"
+}
+
+func (plugin GithubGraphql) Init(config *viper.Viper, logger core.Logger, db
*gorm.DB) error {
+ return nil
+}
+
+func (plugin GithubGraphql) SubTaskMetas() []core.SubTaskMeta {
+ return []core.SubTaskMeta{
+ tasks.CollectRepoMeta,
+ tasks.CollectIssueMeta,
+ tasks.CollectPrMeta,
+
+ githubTasks.CollectApiCommentsMeta,
+ githubTasks.ExtractApiCommentsMeta,
+ githubTasks.CollectApiEventsMeta,
+ githubTasks.ExtractApiEventsMeta,
+ githubTasks.CollectMilestonesMeta,
+ githubTasks.ExtractMilestonesMeta,
+ githubTasks.CollectApiPrReviewCommentsMeta,
+ githubTasks.ExtractApiPrReviewCommentsMeta,
+
+ tasks.CollectAccountMeta,
+ }
+}
+
+type GraphQueryRateLimit struct {
+ RateLimit struct {
+ Limit graphql.Int
+ Remaining graphql.Int
+ ResetAt time.Time
+ }
+}
+
+func (plugin GithubGraphql) PrepareTaskData(taskCtx core.TaskContext, options
map[string]interface{}) (interface{}, error) {
+ var op githubTasks.GithubOptions
+ err := mapstructure.Decode(options, &op)
+ if err != nil {
+ return nil, err
+ }
+ if op.Owner == "" {
+ return nil, fmt.Errorf("owner is required for GitHub execution")
+ }
+ if op.Repo == "" {
+ return nil, fmt.Errorf("repo is required for GitHub execution")
+ }
+
+ connectionHelper := helper.NewConnectionHelper(
+ taskCtx,
+ nil,
+ )
+ connection := &models.GithubConnection{}
+ err = connectionHelper.FirstById(connection, op.ConnectionId)
+ if err != nil {
+ return nil, fmt.Errorf("unable to get github connection by the
given connection ID: %v", err)
+ }
+
+ tokens := strings.Split(connection.Token, ",")
+ src := oauth2.StaticTokenSource(
+ &oauth2.Token{AccessToken: tokens[0]},
+ )
+ httpClient := oauth2.NewClient(taskCtx.GetContext(), src)
+ client := graphql.NewClient(connection.Endpoint+`graphql`, httpClient)
+ graphqlClient := helper.CreateAsyncGraphqlClient(taskCtx.GetContext(),
client, taskCtx.GetLogger(),
+ func(ctx context.Context, client *graphql.Client, logger
core.Logger) (rateRemaining int, resetAt *time.Time, err error) {
+ var query GraphQueryRateLimit
+ err = client.Query(taskCtx.GetContext(), &query, nil)
+ if err != nil {
+ return 0, nil, err
+ }
+ logger.Info(`github graphql init success with remaining
%d/%d and will reset at %s`,
+ query.RateLimit.Remaining,
query.RateLimit.Limit, query.RateLimit.ResetAt)
+ return int(query.RateLimit.Remaining),
&query.RateLimit.ResetAt, nil
+ })
+
+ graphqlClient.SetGetRateCost(func(q interface{}) int {
+ v := reflect.ValueOf(q)
+ return
int(v.Elem().FieldByName(`RateLimit`).FieldByName(`Cost`).Int())
+ })
+
+ apiClient, err := githubTasks.CreateApiClient(taskCtx, connection)
+ if err != nil {
+ return nil, fmt.Errorf("unable to get github API client
instance: %v", err)
+ }
+
+ return &githubTasks.GithubTaskData{
+ Options: &op,
+ ApiClient: apiClient,
+ GraphqlClient: graphqlClient,
+ }, nil
+}
+
+// PkgPath information lost when compiled as plugin(.so)
+func (plugin GithubGraphql) RootPkgPath() string {
+ return "github.com/apache/incubator-devlake/plugins/githubGraphql"
+}
+
+func (plugin GithubGraphql) ApiResources()
map[string]map[string]core.ApiResourceHandler {
+ return nil
+}
+
+// standalone mode for debugging
+func main() {
+ cmd := &cobra.Command{Use: "githubGraphql"}
+ connectionId := cmd.Flags().Uint64P("connectionId", "c", 0, "github
connection id")
+ owner := cmd.Flags().StringP("owner", "o", "", "github owner")
+ repo := cmd.Flags().StringP("repo", "r", "", "github repo")
+ _ = cmd.MarkFlagRequired("connectionId")
+ _ = cmd.MarkFlagRequired("owner")
+ _ = cmd.MarkFlagRequired("repo")
+
+ cmd.Run = func(cmd *cobra.Command, args []string) {
+ runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
+ "connectionId": *connectionId,
+ "owner": *owner,
+ "repo": *repo,
+ })
+ }
+ runner.RunCmd(cmd)
+}
diff --git a/plugins/github_graphql/tasks/account_collector.go
b/plugins/github_graphql/tasks/account_collector.go
new file mode 100755
index 00000000..835eecfe
--- /dev/null
+++ b/plugins/github_graphql/tasks/account_collector.go
@@ -0,0 +1,166 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
+ "github.com/apache/incubator-devlake/plugins/github/models"
+ githubTasks "github.com/apache/incubator-devlake/plugins/github/tasks"
+ "github.com/apache/incubator-devlake/plugins/helper"
+ "github.com/merico-dev/graphql"
+ "reflect"
+)
+
+const RAW_ACCOUNTS_TABLE = "github_graphql_accounts"
+
+type GraphqlQueryAccountWrapper struct {
+ RateLimit struct {
+ Cost int
+ }
+ Users []GraphqlQueryAccount `graphql:"user(login: $login)"
graphql-extend:"true"`
+}
+
+type GraphqlQueryAccount struct {
+ Login string
+ Id int `graphql:"databaseId"`
+ Name string
+ Company string
+ Email string
+ AvatarUrl string
+ HtmlUrl string `graphql:"url"`
+ //Type string
+ Organizations struct {
+ Nodes []struct {
+ Email string
+ Name string
+ DatabaseId int
+ Login string
+ }
+ } `graphql:"organizations(first: 10)"`
+}
+
+var CollectAccountMeta = core.SubTaskMeta{
+ Name: "CollectAccount",
+ EntryPoint: CollectAccount,
+ EnabledByDefault: true,
+ Description: "Collect Account data from GithubGraphql api",
+}
+
+type SimpleAccount struct {
+ Login string
+}
+
+var _ core.SubTaskEntryPoint = CollectAccount
+
+func CollectAccount(taskCtx core.SubTaskContext) error {
+ db := taskCtx.GetDal()
+ data := taskCtx.GetData().(*githubTasks.GithubTaskData)
+
+ cursor, err := db.Cursor(
+ dal.Select("login"),
+ dal.From(models.GithubRepoAccount{}.TableName()),
+ dal.Where("repo_github_id = ? and connection_id=?",
data.Repo.GithubId, data.Options.ConnectionId),
+ )
+ if err != nil {
+ return err
+ }
+ iterator, err := helper.NewDalCursorIterator(db, cursor,
reflect.TypeOf(SimpleAccount{}))
+ if err != nil {
+ return err
+ }
+
+ collector, err :=
helper.NewGraphqlCollector(helper.GraphqlCollectorArgs{
+ RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ Params: githubTasks.GithubApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ Owner: data.Options.Owner,
+ Repo: data.Options.Repo,
+ },
+ Table: RAW_ACCOUNTS_TABLE,
+ },
+ IgnoreQueryErr: true,
+ Input: iterator,
+ InputStep: 100,
+ GraphqlClient: data.GraphqlClient,
+ BuildQuery: func(reqData *helper.GraphqlRequestData)
(interface{}, map[string]interface{}, error) {
+ accounts := reqData.Input.([]interface{})
+ query := &GraphqlQueryAccountWrapper{}
+ users := []map[string]interface{}{}
+ for _, iAccount := range accounts {
+ account := iAccount.(*SimpleAccount)
+ users = append(users, map[string]interface{}{
+ `login`: graphql.String(account.Login),
+ })
+ }
+ variables := map[string]interface{}{
+ "user": users,
+ }
+ return query, variables, nil
+ },
+ ResponseParser: func(iQuery interface{}, variables
map[string]interface{}) ([]interface{}, error) {
+ query := iQuery.(*GraphqlQueryAccountWrapper)
+ accounts := query.Users
+
+ results := make([]interface{}, 0, 1)
+ for _, account := range accounts {
+ relatedUsers, err := convertAccount(account,
data.Options.ConnectionId)
+ if err != nil {
+ return nil, err
+ }
+ results = append(results, relatedUsers...)
+ }
+ return results, nil
+ },
+ })
+
+ if err != nil {
+ return err
+ }
+
+ return collector.Execute()
+}
+
+func convertAccount(res GraphqlQueryAccount, connId uint64) ([]interface{},
error) {
+ results := make([]interface{}, 0, len(res.Organizations.Nodes)+1)
+ githubAccount := &models.GithubAccount{
+ ConnectionId: connId,
+ Id: res.Id,
+ Login: res.Login,
+ Name: res.Name,
+ Company: res.Company,
+ Email: res.Email,
+ AvatarUrl: res.AvatarUrl,
+ //Url: res.Url,
+ HtmlUrl: res.HtmlUrl,
+ Type: `User`,
+ }
+ results = append(results, githubAccount)
+ for _, apiAccountOrg := range res.Organizations.Nodes {
+ githubAccountOrg := &models.GithubAccountOrg{
+ ConnectionId: connId,
+ AccountId: res.Id,
+ OrgId: apiAccountOrg.DatabaseId,
+ OrgLogin: apiAccountOrg.Login,
+ }
+ results = append(results, githubAccountOrg)
+ }
+
+ return results, nil
+}
diff --git a/plugins/github_graphql/tasks/account_graphql_pre_extractor.go
b/plugins/github_graphql/tasks/account_graphql_pre_extractor.go
new file mode 100644
index 00000000..432fc5c7
--- /dev/null
+++ b/plugins/github_graphql/tasks/account_graphql_pre_extractor.go
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+ "github.com/apache/incubator-devlake/plugins/github/models"
+)
+
+type GithubAccountEdge struct {
+ Login string
+ Id int `graphql:"databaseId"`
+ Name string
+ Company string
+ Email string
+ AvatarUrl string
+ HtmlUrl string `graphql:"url"`
+ //Type string
+}
+type GraphqlInlineAccountQuery struct {
+ GithubAccountEdge `graphql:"... on User"`
+}
+
+func convertGraphqlPreAccount(res GraphqlInlineAccountQuery, repoId int,
connId uint64) (*models.GithubRepoAccount, error) {
+ githubAccount := &models.GithubRepoAccount{
+ ConnectionId: connId,
+ RepoGithubId: repoId,
+ Login: res.Login,
+ AccountId: res.Id,
+ }
+
+ return githubAccount, nil
+}
diff --git a/plugins/github_graphql/tasks/account_rest_pre_extractor.go
b/plugins/github_graphql/tasks/account_rest_pre_extractor.go
new file mode 100644
index 00000000..54a9c772
--- /dev/null
+++ b/plugins/github_graphql/tasks/account_rest_pre_extractor.go
@@ -0,0 +1,56 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+ "github.com/apache/incubator-devlake/plugins/github/models"
+)
+
+type GithubAccountResponse struct {
+ Login string `json:"login"`
+ Id int `json:"id"`
+ NodeId string `json:"node_id"`
+ AvatarUrl string `json:"avatar_url"`
+ GravatarId string `json:"gravatar_id"`
+ Url string `json:"url"`
+ HtmlUrl string `json:"html_url"`
+ FollowersUrl string `json:"followers_url"`
+ FollowingUrl string `json:"following_url"`
+ GistsUrl string `json:"gists_url"`
+ StarredUrl string `json:"starred_url"`
+ SubscriptionsUrl string `json:"subscriptions_url"`
+ OrganizationsUrl string `json:"organizations_url"`
+ ReposUrl string `json:"repos_url"`
+ EventsUrl string `json:"events_url"`
+ ReceivedEventsUrl string `json:"received_events_url"`
+ Type string `json:"type"`
+ SiteAdmin bool `json:"site_admin"`
+}
+
+func convertRestPreAccount(res *GithubAccountResponse, repoId int, connId
uint64) ([]interface{}, error) {
+ if res.Type != `User` {
+ return nil, nil
+ }
+ githubAccount := &models.GithubRepoAccount{
+ ConnectionId: connId,
+ RepoGithubId: repoId,
+ Login: res.Login,
+ AccountId: res.Id,
+ }
+ return []interface{}{githubAccount}, nil
+}
diff --git a/plugins/github_graphql/tasks/issue_collector.go
b/plugins/github_graphql/tasks/issue_collector.go
new file mode 100755
index 00000000..ae61d89e
--- /dev/null
+++ b/plugins/github_graphql/tasks/issue_collector.go
@@ -0,0 +1,238 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+ "github.com/apache/incubator-devlake/models/domainlayer/ticket"
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/github/models"
+ githubTasks "github.com/apache/incubator-devlake/plugins/github/tasks"
+ "github.com/apache/incubator-devlake/plugins/helper"
+ "github.com/merico-dev/graphql"
+ "time"
+)
+
+const RAW_ISSUES_TABLE = "github_graphql_issues"
+
+type GraphqlQueryIssueWrapper struct {
+ RateLimit struct {
+ Cost int
+ }
+ Repository struct {
+ IssueList struct {
+ TotalCount graphql.Int
+ Issues []GraphqlQueryIssue `graphql:"nodes"`
+ PageInfo *helper.GraphqlQueryPageInfo
+ } `graphql:"issues(first: $pageSize, after: $skipCursor)"`
+ } `graphql:"repository(owner: $owner, name: $name)"`
+}
+
+type GraphqlQueryIssue struct {
+ DatabaseId int
+ Number int
+ State string
+ StateReason string
+ Title string
+ Body string
+ Author *GraphqlInlineAccountQuery
+ Url string
+ ClosedAt *time.Time
+ CreatedAt time.Time
+ UpdatedAt time.Time
+ AssigneeList struct {
+ // FIXME now domain layer just support one assignee
+ Assignees []GraphqlInlineAccountQuery `graphql:"nodes"`
+ } `graphql:"assignees(first: 1)"`
+ Milestone *struct {
+ Number int `json:"number"`
+ } `json:"milestone"`
+ Labels struct {
+ Nodes []struct {
+ Id string `json:"id"`
+ Name string `json:"name"`
+ }
+ } `graphql:"labels(first: 100)"`
+}
+
+var CollectIssueMeta = core.SubTaskMeta{
+ Name: "CollectIssue",
+ EntryPoint: CollectIssue,
+ EnabledByDefault: true,
+ Description: "Collect Issue data from GithubGraphql api",
+}
+
+var _ core.SubTaskEntryPoint = CollectIssue
+
+func CollectIssue(taskCtx core.SubTaskContext) error {
+ data := taskCtx.GetData().(*githubTasks.GithubTaskData)
+ config := data.Options.TransformationRules
+ issueRegexes, err := githubTasks.NewIssueRegexes(config)
+ if err != nil {
+ return nil
+ }
+
+ collector, err :=
helper.NewGraphqlCollector(helper.GraphqlCollectorArgs{
+ RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ /*
+ This struct will be JSONEncoded and stored into
database along with raw data itself, to identity minimal
+ set of data to be process, for example, we
process JiraIssues by Board
+ */
+ Params: githubTasks.GithubApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ Owner: data.Options.Owner,
+ Repo: data.Options.Repo,
+ },
+ Table: RAW_ISSUES_TABLE,
+ },
+ GraphqlClient: data.GraphqlClient,
+ PageSize: 100,
+ /*
+ (Optional) Return query string for request, or you can
plug them into UrlTemplate directly
+ */
+ BuildQuery: func(reqData *helper.GraphqlRequestData)
(interface{}, map[string]interface{}, error) {
+ query := &GraphqlQueryIssueWrapper{}
+ variables := map[string]interface{}{
+ "pageSize": graphql.Int(reqData.Pager.Size),
+ "skipCursor":
(*graphql.String)(reqData.Pager.SkipCursor),
+ "owner":
graphql.String(data.Options.Owner),
+ "name": graphql.String(data.Options.Repo),
+ }
+ return query, variables, nil
+ },
+ GetPageInfo: func(iQuery interface{}, args
*helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) {
+ query := iQuery.(*GraphqlQueryIssueWrapper)
+ return query.Repository.IssueList.PageInfo, nil
+ },
+ ResponseParser: func(iQuery interface{}, variables
map[string]interface{}) ([]interface{}, error) {
+ query := iQuery.(*GraphqlQueryIssueWrapper)
+ issues := query.Repository.IssueList.Issues
+
+ results := make([]interface{}, 0, 1)
+ for _, issue := range issues {
+ githubIssue, err := convertGithubIssue(issue,
data.Options.ConnectionId, data.Repo.GithubId)
+ if err != nil {
+ return nil, err
+ }
+ githubLabels, err :=
convertGithubLabels(issueRegexes, issue, githubIssue)
+ if err != nil {
+ return nil, err
+ }
+ results = append(results, githubLabels...)
+ results = append(results, githubIssue)
+ if issue.AssigneeList.Assignees != nil &&
len(issue.AssigneeList.Assignees) > 0 {
+ relatedUser, err :=
convertGraphqlPreAccount(issue.AssigneeList.Assignees[0], data.Repo.GithubId,
data.Options.ConnectionId)
+ if err != nil {
+ return nil, err
+ }
+ results = append(results, relatedUser)
+ }
+ if issue.Author != nil {
+ relatedUser, err :=
convertGraphqlPreAccount(*issue.Author, data.Repo.GithubId,
data.Options.ConnectionId)
+ if err != nil {
+ return nil, err
+ }
+ results = append(results, relatedUser)
+ }
+ }
+ return results, nil
+ },
+ })
+
+ if err != nil {
+ return err
+ }
+
+ return collector.Execute()
+}
+
+func convertGithubIssue(issue GraphqlQueryIssue, connectionId uint64,
repositoryId int) (*models.GithubIssue, error) {
+ githubIssue := &models.GithubIssue{
+ ConnectionId: connectionId,
+ GithubId: issue.DatabaseId,
+ RepoId: repositoryId,
+ Number: issue.Number,
+ State: issue.State,
+ Title: issue.Title,
+ Body: issue.Body,
+ Url: issue.Url,
+ ClosedAt: issue.ClosedAt,
+ GithubCreatedAt: issue.CreatedAt,
+ GithubUpdatedAt: issue.UpdatedAt,
+ }
+ if issue.AssigneeList.Assignees != nil &&
len(issue.AssigneeList.Assignees) > 0 {
+ githubIssue.AssigneeId = issue.AssigneeList.Assignees[0].Id
+ githubIssue.AssigneeName = issue.AssigneeList.Assignees[0].Login
+ }
+ if issue.Author != nil {
+ githubIssue.AuthorId = issue.Author.Id
+ githubIssue.AuthorName = issue.Author.Login
+ }
+ if issue.ClosedAt != nil {
+ githubIssue.LeadTimeMinutes =
uint(issue.ClosedAt.Sub(issue.CreatedAt).Minutes())
+ }
+ if issue.Milestone != nil {
+ githubIssue.MilestoneId = issue.Milestone.Number
+ }
+ return githubIssue, nil
+}
+
+func convertGithubLabels(issueRegexes *githubTasks.IssueRegexes, issue
GraphqlQueryIssue, githubIssue *models.GithubIssue) ([]interface{}, error) {
+ var results []interface{}
+ for _, label := range issue.Labels.Nodes {
+ results = append(results, &models.GithubIssueLabel{
+ ConnectionId: githubIssue.ConnectionId,
+ IssueId: githubIssue.GithubId,
+ LabelName: label.Name,
+ })
+ if issueRegexes.SeverityRegex != nil {
+ groups :=
issueRegexes.SeverityRegex.FindStringSubmatch(label.Name)
+ if len(groups) > 0 {
+ githubIssue.Severity = groups[1]
+ }
+ }
+ if issueRegexes.ComponentRegex != nil {
+ groups :=
issueRegexes.ComponentRegex.FindStringSubmatch(label.Name)
+ if len(groups) > 0 {
+ githubIssue.Component = groups[1]
+ }
+ }
+ if issueRegexes.PriorityRegex != nil {
+ groups :=
issueRegexes.PriorityRegex.FindStringSubmatch(label.Name)
+ if len(groups) > 0 {
+ githubIssue.Priority = groups[1]
+ }
+ }
+ if issueRegexes.TypeBugRegex != nil {
+ if ok :=
issueRegexes.TypeBugRegex.MatchString(label.Name); ok {
+ githubIssue.Type = ticket.BUG
+ }
+ }
+ if issueRegexes.TypeRequirementRegex != nil {
+ if ok :=
issueRegexes.TypeRequirementRegex.MatchString(label.Name); ok {
+ githubIssue.Type = ticket.REQUIREMENT
+ }
+ }
+ if issueRegexes.TypeIncidentRegex != nil {
+ if ok :=
issueRegexes.TypeIncidentRegex.MatchString(label.Name); ok {
+ githubIssue.Type = ticket.INCIDENT
+ }
+ }
+ }
+ return results, nil
+}
diff --git a/plugins/github_graphql/tasks/pr_collector.go
b/plugins/github_graphql/tasks/pr_collector.go
new file mode 100755
index 00000000..08ec5ed5
--- /dev/null
+++ b/plugins/github_graphql/tasks/pr_collector.go
@@ -0,0 +1,333 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+ "fmt"
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/github/models"
+ githubTasks "github.com/apache/incubator-devlake/plugins/github/tasks"
+ "github.com/apache/incubator-devlake/plugins/helper"
+ "github.com/merico-dev/graphql"
+ "regexp"
+ "runtime/debug"
+ "time"
+)
+
+const RAW_PRS_TABLE = "github_graphql_prs"
+
+type GraphqlQueryPrWrapper struct {
+ RateLimit struct {
+ Cost int
+ }
+ Repository struct {
+ PullRequests struct {
+ PageInfo *helper.GraphqlQueryPageInfo
+ Prs []GraphqlQueryPr `graphql:"nodes"`
+ TotalCount graphql.Int
+ } `graphql:"pullRequests(first: $pageSize, after: $skipCursor)"`
+ } `graphql:"repository(owner: $owner, name: $name)"`
+}
+
+type GraphqlQueryPr struct {
+ DatabaseId int
+ Number int
+ State string
+ Title string
+ Body string
+ Url string
+ Labels struct {
+ Nodes []struct {
+ Id string
+ Name string
+ }
+ } `graphql:"labels(first: 100)"`
+ Author *GraphqlInlineAccountQuery
+ Assignees struct {
+ // FIXME now domain layer just support one assignee
+ Assignees []GraphqlInlineAccountQuery `graphql:"nodes"`
+ } `graphql:"assignees(first: 1)"`
+ ClosedAt *time.Time
+ MergedAt *time.Time
+ UpdatedAt time.Time
+ CreatedAt time.Time
+ MergeCommit *struct {
+ Oid string
+ }
+ HeadRefName string
+ HeadRefOid string
+ BaseRefName string
+ BaseRefOid string
+ Commits struct {
+ PageInfo *helper.GraphqlQueryPageInfo
+ Nodes []GraphqlQueryCommit `graphql:"nodes"`
+ TotalCount graphql.Int
+ } `graphql:"commits(first: 100)"`
+ Reviews struct {
+ TotalCount graphql.Int
+ Nodes []GraphqlQueryReview `graphql:"nodes"`
+ } `graphql:"reviews(first: 100)"`
+}
+
+type GraphqlQueryReview struct {
+ Body string
+ Author *GraphqlInlineAccountQuery
+ State string `json:"state"`
+ DatabaseId int `json:"databaseId"`
+ Commit struct {
+ Oid string
+ }
+ SubmittedAt *time.Time `json:"submittedAt"`
+}
+
+type GraphqlQueryCommit struct {
+ Commit struct {
+ Oid string
+ Message string
+ Author struct {
+ Name string
+ Email string
+ Date time.Time
+ User *GraphqlInlineAccountQuery
+ }
+ Committer struct {
+ Date time.Time
+ Email string
+ Name string
+ }
+ }
+ Url string
+}
+
+var CollectPrMeta = core.SubTaskMeta{
+ Name: "CollectPr",
+ EntryPoint: CollectPr,
+ EnabledByDefault: true,
+ Description: "Collect Pr data from GithubGraphql api",
+}
+
+var _ core.SubTaskEntryPoint = CollectPr
+
+func CollectPr(taskCtx core.SubTaskContext) error {
+ data := taskCtx.GetData().(*githubTasks.GithubTaskData)
+ config := data.Options.TransformationRules
+ var labelTypeRegex *regexp.Regexp
+ var labelComponentRegex *regexp.Regexp
+ var prType = config.PrType
+ var err error
+ if len(prType) > 0 {
+ labelTypeRegex, err = regexp.Compile(prType)
+ if err != nil {
+ return fmt.Errorf("regexp Compile prType failed:[%s]
stack:[%s]", err.Error(), debug.Stack())
+ }
+ }
+ var prComponent = config.PrComponent
+ if len(prComponent) > 0 {
+ labelComponentRegex, err = regexp.Compile(prComponent)
+ if err != nil {
+ return fmt.Errorf("regexp Compile prComponent
failed:[%s] stack:[%s]", err.Error(), debug.Stack())
+ }
+ }
+
+ collector, err :=
helper.NewGraphqlCollector(helper.GraphqlCollectorArgs{
+ RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ Params: githubTasks.GithubApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ Owner: data.Options.Owner,
+ Repo: data.Options.Repo,
+ },
+ Table: RAW_PRS_TABLE,
+ },
+ GraphqlClient: data.GraphqlClient,
+ PageSize: 100,
+ /*
+ (Optional) Return query string for request, or you can
plug them into UrlTemplate directly
+ */
+ BuildQuery: func(reqData *helper.GraphqlRequestData)
(interface{}, map[string]interface{}, error) {
+ query := &GraphqlQueryPrWrapper{}
+ variables := map[string]interface{}{
+ "pageSize": graphql.Int(reqData.Pager.Size),
+ "skipCursor":
(*graphql.String)(reqData.Pager.SkipCursor),
+ "owner":
graphql.String(data.Options.Owner),
+ "name": graphql.String(data.Options.Repo),
+ }
+ return query, variables, nil
+ },
+ GetPageInfo: func(iQuery interface{}, args
*helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) {
+ query := iQuery.(*GraphqlQueryPrWrapper)
+ return query.Repository.PullRequests.PageInfo, nil
+ },
+ ResponseParser: func(iQuery interface{}, variables
map[string]interface{}) ([]interface{}, error) {
+ query := iQuery.(*GraphqlQueryPrWrapper)
+ prs := query.Repository.PullRequests.Prs
+
+ results := make([]interface{}, 0, 1)
+ for _, rawL := range prs {
+ //If this is a pr, ignore
+ githubPr, err := convertGithubPullRequest(rawL,
data.Options.ConnectionId, data.Repo.GithubId)
+ if err != nil {
+ return nil, err
+ }
+ if rawL.Author != nil {
+ githubUser, err :=
convertGraphqlPreAccount(*rawL.Author, data.Repo.GithubId,
data.Options.ConnectionId)
+ if err != nil {
+ return nil, err
+ }
+ results = append(results, githubUser)
+ }
+ for _, label := range rawL.Labels.Nodes {
+ results = append(results,
&models.GithubPrLabel{
+ ConnectionId:
data.Options.ConnectionId,
+ PullId: githubPr.GithubId,
+ LabelName: label.Name,
+ })
+ // if pr.Type has not been set and
prType is set in .env, process the below
+ if labelTypeRegex != nil {
+ groups :=
labelTypeRegex.FindStringSubmatch(label.Name)
+ if len(groups) > 0 {
+ githubPr.Type =
groups[1]
+ }
+ }
+
+ // if pr.Component has not been set and
prComponent is set in .env, process
+ if labelComponentRegex != nil {
+ groups :=
labelComponentRegex.FindStringSubmatch(label.Name)
+ if len(groups) > 0 {
+ githubPr.Component =
groups[1]
+ }
+ }
+ }
+ results = append(results, githubPr)
+
+ for _, apiPullRequestReview := range
rawL.Reviews.Nodes {
+ if apiPullRequestReview.State !=
"PENDING" {
+ githubReviewer :=
&models.GithubReviewer{
+ ConnectionId:
data.Options.ConnectionId,
+ GithubId:
apiPullRequestReview.Author.Id,
+ Login:
apiPullRequestReview.Author.Login,
+ PullRequestId:
githubPr.GithubId,
+ }
+
+ githubPrReview :=
&models.GithubPrReview{
+ ConnectionId:
data.Options.ConnectionId,
+ GithubId:
apiPullRequestReview.DatabaseId,
+ Body:
apiPullRequestReview.Body,
+ State:
apiPullRequestReview.State,
+ CommitSha:
apiPullRequestReview.Commit.Oid,
+ GithubSubmitAt:
apiPullRequestReview.SubmittedAt,
+
+ PullRequestId:
githubPr.GithubId,
+ AuthorUsername:
apiPullRequestReview.Author.Login,
+ AuthorUserId:
apiPullRequestReview.Author.Id,
+ }
+
+ results = append(results,
githubReviewer)
+ results = append(results,
githubPrReview)
+ githubUser, err :=
convertGraphqlPreAccount(*apiPullRequestReview.Author, data.Repo.GithubId,
data.Options.ConnectionId)
+ if err != nil {
+ return nil, err
+ }
+ results = append(results,
githubUser)
+ }
+ }
+
+ for _, apiPullRequestCommit := range
rawL.Commits.Nodes {
+ githubCommit, err :=
convertPullRequestCommit(apiPullRequestCommit)
+ if err != nil {
+ return nil, err
+ }
+ results = append(results, githubCommit)
+
+ githubPullRequestCommit :=
&models.GithubPrCommit{
+ ConnectionId:
data.Options.ConnectionId,
+ CommitSha:
apiPullRequestCommit.Commit.Oid,
+ PullRequestId:
githubPr.GithubId,
+ }
+ if err != nil {
+ return nil, err
+ }
+ results = append(results,
githubPullRequestCommit)
+
+ if
apiPullRequestCommit.Commit.Author.User != nil {
+ githubUser, err :=
convertGraphqlPreAccount(*apiPullRequestCommit.Commit.Author.User,
data.Repo.GithubId, data.Options.ConnectionId)
+ if err != nil {
+ return nil, err
+ }
+ results = append(results,
githubUser)
+ }
+ }
+
+ }
+ return results, nil
+ },
+ })
+
+ if err != nil {
+ return err
+ }
+
+ return collector.Execute()
+}
+
+func convertGithubPullRequest(pull GraphqlQueryPr, connId uint64, repoId int)
(*models.GithubPullRequest, error) {
+ githubPull := &models.GithubPullRequest{
+ ConnectionId: connId,
+ GithubId: pull.DatabaseId,
+ RepoId: repoId,
+ Number: pull.Number,
+ State: pull.State,
+ Title: pull.Title,
+ Url: pull.Url,
+ GithubCreatedAt: pull.CreatedAt,
+ GithubUpdatedAt: pull.UpdatedAt,
+ ClosedAt: pull.ClosedAt,
+ MergedAt: pull.MergedAt,
+ Body: pull.Body,
+ BaseRef: pull.BaseRefName,
+ BaseCommitSha: pull.BaseRefOid,
+ HeadRef: pull.HeadRefName,
+ HeadCommitSha: pull.HeadRefOid,
+ }
+ if pull.MergeCommit != nil {
+ githubPull.MergeCommitSha = pull.MergeCommit.Oid
+ }
+ if pull.Author != nil {
+ githubPull.AuthorName = pull.Author.Login
+ githubPull.AuthorId = pull.Author.Id
+ }
+ return githubPull, nil
+}
+
+func convertPullRequestCommit(prCommit GraphqlQueryCommit)
(*models.GithubCommit, error) {
+ githubCommit := &models.GithubCommit{
+ Sha: prCommit.Commit.Oid,
+ Message: prCommit.Commit.Message,
+ AuthorName: prCommit.Commit.Author.Name,
+ AuthorEmail: prCommit.Commit.Author.Email,
+ AuthoredDate: prCommit.Commit.Author.Date,
+ CommitterName: prCommit.Commit.Committer.Name,
+ CommitterEmail: prCommit.Commit.Committer.Email,
+ CommittedDate: prCommit.Commit.Committer.Date,
+ Url: prCommit.Url,
+ }
+ if prCommit.Commit.Author.User != nil {
+ githubCommit.AuthorId = prCommit.Commit.Author.User.Id
+ }
+ return githubCommit, nil
+}
diff --git a/plugins/github_graphql/tasks/repo_collector.go
b/plugins/github_graphql/tasks/repo_collector.go
new file mode 100755
index 00000000..18e1f62b
--- /dev/null
+++ b/plugins/github_graphql/tasks/repo_collector.go
@@ -0,0 +1,136 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/github/models"
+ githubTasks "github.com/apache/incubator-devlake/plugins/github/tasks"
+ "github.com/apache/incubator-devlake/plugins/helper"
+ "github.com/merico-dev/graphql"
+ "time"
+)
+
+const RAW_REPO_TABLE = "github_graphql_repo"
+
+var _ core.SubTaskEntryPoint = CollectRepo
+
+type GraphqlQueryRepo struct {
+ RateLimit struct {
+ Cost int
+ }
+ Repository struct {
+ Name string `graphql:"name"`
+ GithubId int `graphql:"databaseId"`
+ HTMLUrl string `graphql:"url"`
+ Languages struct {
+ Nodes []struct {
+ Name string
+ }
+ } `graphql:"languages(first: 1)"`
+ Description string `graphql:"description"`
+ Owner GraphqlInlineAccountQuery
+ CreatedDate time.Time `graphql:"createdAt"`
+ UpdatedDate *time.Time `graphql:"updatedAt"`
+ Parent *struct {
+ GithubId int `graphql:"databaseId"`
+ HTMLUrl string `graphql:"url"`
+ }
+ } `graphql:"repository(owner: $owner, name: $name)"`
+}
+
+var CollectRepoMeta = core.SubTaskMeta{
+ Name: "CollectRepo",
+ EntryPoint: CollectRepo,
+ EnabledByDefault: true,
+ Description: "Collect Repo data from GithubGraphql api",
+}
+
+func CollectRepo(taskCtx core.SubTaskContext) error {
+ data := taskCtx.GetData().(*githubTasks.GithubTaskData)
+ db := taskCtx.GetDal()
+
+ collector, err :=
helper.NewGraphqlCollector(helper.GraphqlCollectorArgs{
+ RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ /*
+ This struct will be JSONEncoded and stored into
database along with raw data itself, to identity minimal
+ set of data to be process, for example, we
process JiraIssues by Board
+ */
+ Params: githubTasks.GithubApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ Owner: data.Options.Owner,
+ Repo: data.Options.Repo,
+ },
+ Table: RAW_REPO_TABLE,
+ },
+ GraphqlClient: data.GraphqlClient,
+ /*
+ (Optional) Return query string for request, or you can
plug them into UrlTemplate directly
+ */
+ BuildQuery: func(reqData *helper.GraphqlRequestData)
(interface{}, map[string]interface{}, error) {
+ query := &GraphqlQueryRepo{}
+ variables := map[string]interface{}{
+ "owner": graphql.String(data.Options.Owner),
+ "name": graphql.String(data.Options.Repo),
+ }
+ return query, variables, nil
+ },
+ ResponseParser: func(iQuery interface{}, variables
map[string]interface{}) ([]interface{}, error) {
+ query := iQuery.(*GraphqlQueryRepo)
+ repository := query.Repository
+ results := make([]interface{}, 0, 1)
+ githubRepository := &models.GithubRepo{
+ ConnectionId: data.Options.ConnectionId,
+ GithubId: repository.GithubId,
+ Name: repository.Name,
+ HTMLUrl: repository.HTMLUrl,
+ Description: repository.Description,
+ OwnerId: repository.Owner.Id,
+ OwnerLogin: repository.Owner.Login,
+ Language:
repository.Languages.Nodes[0].Name,
+ CreatedDate: repository.CreatedDate,
+ UpdatedDate: repository.UpdatedDate,
+ }
+ data.Repo = githubRepository
+
+ if repository.Parent != nil {
+ githubRepository.ParentGithubId =
repository.Parent.GithubId
+ githubRepository.ParentHTMLUrl =
repository.Parent.HTMLUrl
+ }
+ err := db.CreateOrUpdate(githubRepository)
+ if err != nil {
+ return nil, err
+ }
+ results = append(results, githubRepository)
+
+ githubUser, err :=
convertGraphqlPreAccount(repository.Owner, data.Repo.GithubId,
data.Options.ConnectionId)
+ if err != nil {
+ return nil, err
+ }
+ results = append(results, githubUser)
+ return results, nil
+ },
+ })
+
+ if err != nil {
+ return err
+ }
+
+ return collector.Execute()
+}
diff --git a/plugins/helper/graphql_async_client.go
b/plugins/helper/graphql_async_client.go
new file mode 100644
index 00000000..1dbb4484
--- /dev/null
+++ b/plugins/helper/graphql_async_client.go
@@ -0,0 +1,165 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package helper
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/merico-dev/graphql"
+ "sync"
+ "time"
+)
+
+// GraphqlAsyncClient send graphql one by one
+type GraphqlAsyncClient struct {
+ ctx context.Context
+ cancel context.CancelFunc
+ client *graphql.Client
+ logger core.Logger
+ mu sync.Mutex
+ waitGroup sync.WaitGroup
+ workerErrors []error
+
+ rateExhaustCond *sync.Cond
+ rateRemaining int
+ getRateRemaining func(context.Context, *graphql.Client, core.Logger)
(rateRemaining int, resetAt *time.Time, err error)
+ getRateCost func(q interface{}) int
+}
+
+// CreateAsyncGraphqlClient creates a new GraphqlAsyncClient
+func CreateAsyncGraphqlClient(
+ ctx context.Context,
+ graphqlClient *graphql.Client,
+ logger core.Logger,
+ getRateRemaining func(context.Context, *graphql.Client, core.Logger)
(rateRemaining int, resetAt *time.Time, err error),
+) *GraphqlAsyncClient {
+ ctxWithCancel, cancel := context.WithCancel(ctx)
+ graphqlAsyncClient := &GraphqlAsyncClient{
+ ctx: ctxWithCancel,
+ cancel: cancel,
+ client: graphqlClient,
+ logger: logger,
+ rateExhaustCond: sync.NewCond(&sync.Mutex{}),
+ rateRemaining: 0,
+ getRateRemaining: getRateRemaining,
+ }
+ if getRateRemaining != nil {
+ rateRemaining, resetAt, err := getRateRemaining(ctx,
graphqlClient, logger)
+ if err != nil {
+ panic(err)
+ }
+ graphqlAsyncClient.updateRateRemaining(rateRemaining, resetAt)
+ }
+ return graphqlAsyncClient
+}
+
+// updateRateRemaining call getRateRemaining to update rateRemaining
periodically
+func (apiClient *GraphqlAsyncClient) updateRateRemaining(rateRemaining int,
resetAt *time.Time) {
+ apiClient.rateRemaining = rateRemaining
+ if rateRemaining > 0 {
+ apiClient.rateExhaustCond.Signal()
+ }
+ go func() {
+ nextDuring := 3 * time.Minute
+ if resetAt != nil && resetAt.After(time.Now()) {
+ nextDuring = resetAt.Sub(time.Now())
+ }
+ <-time.After(nextDuring)
+ newRateRemaining, newResetAt, err :=
apiClient.getRateRemaining(apiClient.ctx, apiClient.client, apiClient.logger)
+ if err != nil {
+ panic(err)
+ }
+ apiClient.updateRateRemaining(newRateRemaining, newResetAt)
+ }()
+}
+
+// SetGetRateCost to calculate how many rate cost
+// if not set, all query just cost 1
+func (apiClient *GraphqlAsyncClient) SetGetRateCost(getRateCost func(q
interface{}) int) {
+ apiClient.getRateCost = getRateCost
+}
+
+// Query send a graphql request when get lock
+func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables
map[string]interface{}) error {
+ apiClient.waitGroup.Add(1)
+ defer apiClient.waitGroup.Done()
+ apiClient.mu.Lock()
+ defer apiClient.mu.Unlock()
+
+ apiClient.rateExhaustCond.L.Lock()
+ defer apiClient.rateExhaustCond.L.Unlock()
+ for apiClient.rateRemaining <= 0 {
+ apiClient.logger.Info(`rate limit remaining exhausted, waiting
for next period.`)
+ apiClient.rateExhaustCond.Wait()
+ }
+ select {
+ case <-apiClient.ctx.Done():
+ return nil
+ default:
+ err := apiClient.client.Query(apiClient.ctx, q, variables)
+ if err != nil {
+ return err
+ }
+ cost := 1
+ if apiClient.getRateCost != nil {
+ cost = apiClient.getRateCost(q)
+ }
+ apiClient.rateRemaining -= cost
+ apiClient.logger.Debug(`query cost %d in %v`, cost, variables)
+ return nil
+ }
+}
+
+// NextTick to return the NextTick of scheduler
+func (apiClient *GraphqlAsyncClient) NextTick(task func() error) {
+ // to make sure task will be enqueued
+ apiClient.waitGroup.Add(1)
+ go func() {
+ defer apiClient.waitGroup.Done()
+ select {
+ case <-apiClient.ctx.Done():
+ return
+ default:
+ go func() {
+ apiClient.checkError(task())
+ }()
+ }
+ }()
+}
+
+// WaitAsync blocks until all async requests were done
+func (apiClient *GraphqlAsyncClient) Wait() error {
+ apiClient.waitGroup.Wait()
+ if len(apiClient.workerErrors) > 0 {
+ return fmt.Errorf("%s", apiClient.workerErrors)
+ }
+ return nil
+}
+
+func (apiClient *GraphqlAsyncClient) checkError(err error) {
+ if err == nil {
+ return
+ }
+ apiClient.workerErrors = append(apiClient.workerErrors, err)
+}
+
+// HasError return if any error occurred
+func (apiClient *GraphqlAsyncClient) HasError() bool {
+ return len(apiClient.workerErrors) > 0
+}
diff --git a/plugins/helper/graphql_collector.go
b/plugins/helper/graphql_collector.go
new file mode 100644
index 00000000..ed69e0cc
--- /dev/null
+++ b/plugins/helper/graphql_collector.go
@@ -0,0 +1,309 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package helper
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/apache/incubator-devlake/models/common"
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
+ "github.com/merico-dev/graphql"
+ "net/http"
+ "reflect"
+)
+
+// CursorPager contains pagination information for a graphql request
+type CursorPager struct {
+ SkipCursor *string
+ Size int
+}
+
+// GraphqlRequestData is the input of `UrlTemplate` `BuildQuery` and `Header`,
so we can generate them dynamically
+type GraphqlRequestData struct {
+ Pager *CursorPager
+ Params interface{}
+ Input interface{}
+ InputJSON []byte
+}
+
+type GraphqlQueryPageInfo struct {
+ EndCursor string `json:"endCursor"`
+ HasNextPage bool `json:"hasNextPage"`
+}
+
+type GraphqlAsyncResponseHandler func(res *http.Response) error
+
+type GraphqlCollectorArgs struct {
+ RawDataSubTaskArgs
+ // BuildQuery would be sent out as part of the request URL
+ BuildQuery func(reqData *GraphqlRequestData) (query interface{},
variables map[string]interface{}, err error)
+ // PageSize tells ApiCollector the page size
+ PageSize int
+ // GraphqlClient is a asynchronize api request client with qps
+ GraphqlClient *GraphqlAsyncClient
+ // Input helps us collect data based on previous collected data, like
collecting changelogs based on jira
+ // issue ids
+ Input Iterator
+ // how many times fetched from input, default 1 means only fetch once
+ // NOTICE: InputStep=1 will fill value as item and InputStep>1 will
fill value as []item
+ InputStep int
+ IgnoreQueryErr bool
+ // GetPageInfo is to tell `GraphqlCollector` is page information
+ GetPageInfo func(query interface{}, args *GraphqlCollectorArgs)
(*GraphqlQueryPageInfo, error)
+ BatchSize int
+ ResponseParser func(query interface{}, variables
map[string]interface{}) ([]interface{}, error)
+}
+
+type GraphqlCollector struct {
+ *RawDataSubTask
+ args *GraphqlCollectorArgs
+}
+
+// NewGraphqlCollector allocates a new GraphqlCollector with the given args.
+// GraphqlCollector can help us collect data from api with ease, pass in a
AsyncGraphqlClient and tell it which part
+// of response we want to save, GraphqlCollector will collect them from remote
server and store them into database.
+func NewGraphqlCollector(args GraphqlCollectorArgs) (*GraphqlCollector, error)
{
+ // process args
+ rawDataSubTask, err := newRawDataSubTask(args.RawDataSubTaskArgs)
+ if err != nil {
+ return nil, err
+ }
+ if err != nil {
+ return nil, fmt.Errorf("Failed to compile UrlTemplate: %w", err)
+ }
+ if args.GraphqlClient == nil {
+ return nil, fmt.Errorf("ApiClient is required")
+ }
+ if args.ResponseParser == nil {
+ return nil, fmt.Errorf("ResponseParser is required")
+ }
+ apicllector := &GraphqlCollector{
+ RawDataSubTask: rawDataSubTask,
+ args: &args,
+ }
+ if args.BatchSize == 0 {
+ args.BatchSize = 500
+ }
+ if args.InputStep == 0 {
+ args.InputStep = 1
+ }
+ //if args.AfterResponse != nil {
+ // apicllector.SetAfterResponse(args.AfterResponse)
+ //} else {
+ // apicllector.SetAfterResponse(func(res *http.Response) error {
+ // if res.StatusCode == http.StatusUnauthorized {
+ // return fmt.Errorf("authentication failed,
please check your AccessToken")
+ // }
+ // return nil
+ // })
+ //}
+ return apicllector, nil
+}
+
+// Start collection
+func (collector *GraphqlCollector) Execute() error {
+ logger := collector.args.Ctx.GetLogger()
+ logger.Info("start graphql collection")
+
+ // make sure table is created
+ db := collector.args.Ctx.GetDal()
+ err := db.AutoMigrate(&RawData{}, dal.From(collector.table))
+ if err != nil {
+ return err
+ }
+
+ // flush data if not incremental collection
+ err = db.Delete(&RawData{}, dal.From(collector.table),
dal.Where("params = ?", collector.params))
+ if err != nil {
+ return err
+ }
+ divider := NewBatchSaveDivider(collector.args.Ctx,
collector.args.BatchSize, collector.table, collector.params)
+
+ collector.args.Ctx.SetProgress(0, -1)
+ if collector.args.Input != nil {
+ iterator := collector.args.Input
+ defer iterator.Close()
+ apiClient := collector.args.GraphqlClient
+ for iterator.HasNext() && !apiClient.HasError() {
+ if collector.args.InputStep == 1 {
+ input, err := iterator.Fetch()
+ if err != nil {
+ break
+ }
+ collector.exec(divider, input)
+ } else {
+ var inputs []interface{}
+ for i := 0; i < collector.args.InputStep &&
iterator.HasNext(); i++ {
+ input, err := iterator.Fetch()
+ if err != nil {
+ break
+ }
+ inputs = append(inputs, input)
+ }
+ collector.exec(divider, inputs)
+ }
+ }
+ } else {
+ // or we just did it once
+ collector.exec(divider, nil)
+ }
+
+ logger.Debug("wait for all async api to finished")
+
+ err = collector.args.GraphqlClient.Wait()
+ if err != nil {
+ logger.Info("end api collection error: %w", err)
+ } else {
+ logger.Info("end api collection without error")
+ }
+ err = divider.Close()
+
+ return err
+}
+
+func (collector *GraphqlCollector) exec(divider *BatchSaveDivider, input
interface{}) {
+ inputJson, err := json.Marshal(input)
+ if err != nil {
+ panic(err)
+ }
+ reqData := new(GraphqlRequestData)
+ reqData.Input = input
+ reqData.InputJSON = inputJson
+ reqData.Pager = &CursorPager{
+ SkipCursor: nil,
+ Size: collector.args.PageSize,
+ }
+ if collector.args.GetPageInfo != nil {
+ collector.fetchOneByOne(divider, reqData)
+ } else {
+ collector.fetchAsync(divider, reqData, nil)
+ }
+}
+
+// fetchPagesDetermined fetches data of all pages for APIs that return paging
information
+func (collector *GraphqlCollector) fetchOneByOne(divider *BatchSaveDivider,
reqData *GraphqlRequestData) {
+ // fetch first page
+ var fetchNextPage func(query interface{}) error
+ fetchNextPage = func(query interface{}) error {
+ pageInfo, err := collector.args.GetPageInfo(query,
collector.args)
+ if err != nil {
+ return fmt.Errorf("fetchPagesDetermined get totalPages
faileds: %s", err.Error())
+ }
+ if pageInfo.HasNextPage {
+ collector.args.GraphqlClient.NextTick(func() error {
+ reqDataTemp := &GraphqlRequestData{
+ Pager: &CursorPager{
+ SkipCursor: &pageInfo.EndCursor,
+ Size:
collector.args.PageSize,
+ },
+ Input: reqData.Input,
+ InputJSON: reqData.InputJSON,
+ }
+ collector.fetchAsync(divider, reqDataTemp,
fetchNextPage)
+ return nil
+ })
+ }
+ return nil
+ }
+ collector.fetchAsync(divider, reqData, fetchNextPage)
+}
+
+func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider,
reqData *GraphqlRequestData, handler func(query interface{}) error) {
+ if reqData.Pager == nil {
+ reqData.Pager = &CursorPager{
+ SkipCursor: nil,
+ Size: collector.args.PageSize,
+ }
+ }
+ query, variables, err := collector.args.BuildQuery(reqData)
+ if err != nil {
+ panic(err)
+ }
+
+ logger := collector.args.Ctx.GetLogger()
+ err = collector.args.GraphqlClient.Query(query, variables)
+ if err != nil {
+ if collector.args.IgnoreQueryErr {
+ logger.Error("fetchAsync fail and got:", err)
+ return
+ } else {
+ panic(err)
+ }
+ }
+ defer logger.Debug("fetchAsync >>> done for %v %v", query, variables)
+
+ paramsBytes, err := json.Marshal(query)
+ if err != nil {
+ panic(err)
+ }
+ db := collector.args.Ctx.GetDal()
+ queryStr, _ := graphql.ConstructQuery(query, variables)
+ row := &RawData{
+ Params: collector.params,
+ Data: paramsBytes,
+ Url: queryStr,
+ Input: reqData.InputJSON,
+ }
+ err = db.Create(row, dal.From(collector.table))
+ if err != nil {
+ panic(err)
+ }
+
+ results, err := collector.args.ResponseParser(query, variables)
+ if err != nil {
+ panic(err)
+ }
+
+ RAW_DATA_ORIGIN := "RawDataOrigin"
+ // batch save divider
+ for _, result := range results {
+ // get the batch operator for the specific type
+ batch, err := divider.ForType(reflect.TypeOf(result))
+ if err != nil {
+ panic(err)
+ }
+ // set raw data origin field
+ origin :=
reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
+ if origin.IsValid() {
+ origin.Set(reflect.ValueOf(common.RawDataOrigin{
+ RawDataTable: collector.table,
+ RawDataId: row.ID,
+ RawDataParams: row.Params,
+ }))
+ }
+ // records get saved into db when slots were max outed
+ err = batch.Add(result)
+ if err != nil {
+ panic(err)
+ }
+ collector.args.Ctx.IncProgress(1)
+ }
+ if err != nil {
+ panic(err)
+ }
+ collector.args.Ctx.IncProgress(1)
+ if handler != nil {
+ err = handler(query)
+ if err != nil {
+ panic(err)
+ }
+ }
+}
+
+var _ core.SubTask = (*ApiCollector)(nil)