This is an automated email from the ASF dual-hosted git repository. klesh pushed a commit to branch kw-7852-components-field-length in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit d643a16ba2074dc4a34b7e0e379da659f3b5db19 Author: Klesh Wong <[email protected]> AuthorDate: Wed Aug 7 16:22:02 2024 +0800 fix: pull requests not being updated --- backend/go.mod | 16 ++--- backend/go.sum | 36 +++++------ .../pluginhelper/api/api_collector_stateful.go | 14 ++++- .../helpers/pluginhelper/api/graphql_collector.go | 8 +-- .../plugins/github_graphql/tasks/pr_collector.go | 73 ++++++++++++++++++++-- 5 files changed, 108 insertions(+), 39 deletions(-) diff --git a/backend/go.mod b/backend/go.mod index c892dede1..a44f58abe 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -15,7 +15,7 @@ require ( github.com/lib/pq v1.10.2 github.com/libgit2/git2go/v33 v33.0.6 github.com/magiconair/properties v1.8.5 - github.com/merico-dev/graphql v0.0.0-20221027131946-77460a1fd4cd + github.com/merico-dev/graphql v0.0.0-20240807070533-1cafa544cd5d github.com/mitchellh/mapstructure v1.5.0 github.com/panjf2000/ants/v2 v2.4.6 github.com/robfig/cron/v3 v3.0.0 @@ -30,10 +30,10 @@ require ( github.com/swaggo/swag v1.16.1 github.com/tidwall/gjson v1.14.3 github.com/viant/afs v1.16.0 - golang.org/x/crypto v0.21.0 // indirect + golang.org/x/crypto v0.26.0 // indirect golang.org/x/exp v0.0.0-20221028150844-83b7d23a625f golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 - golang.org/x/sync v0.7.0 + golang.org/x/sync v0.8.0 gorm.io/datatypes v1.0.1 gorm.io/driver/mysql v1.5.1 gorm.io/driver/postgres v1.5.2 @@ -105,10 +105,10 @@ require ( github.com/ugorji/go/codec v1.2.11 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect golang.org/x/arch v0.3.0 // indirect - golang.org/x/net v0.22.0 // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.13.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.23.0 // indirect + golang.org/x/text v0.17.0 // indirect + golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect @@ -122,7 +122,7 @@ require ( github.com/go-sql-driver/mysql v1.7.1 github.com/golang-jwt/jwt/v5 v5.0.0-rc.1 github.com/rogpeppe/go-internal v1.11.0 - golang.org/x/mod v0.13.0 + golang.org/x/mod v0.17.0 ) //replace github.com/apache/incubator-devlake => ./ diff --git a/backend/go.sum b/backend/go.sum index dd6f20f42..b8106a8c6 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -389,8 +389,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.14.5 h1:1IdxlwTNazvbKJQSxoJ5/9ECbEeaTTyeU7sEAZ5KKTQ= github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI= -github.com/merico-dev/graphql v0.0.0-20221027131946-77460a1fd4cd h1:hGQXd4a72JSFIZE+ZVkH5ivE925PGogjob6stgc2too= -github.com/merico-dev/graphql v0.0.0-20221027131946-77460a1fd4cd/go.mod h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ= +github.com/merico-dev/graphql v0.0.0-20240807070533-1cafa544cd5d h1:FpP+YRQudZtnrnIaFvVc87D/WVI7tWi0hBrnRCY8wmQ= +github.com/merico-dev/graphql v0.0.0-20240807070533-1cafa544cd5d/go.mod h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= @@ -564,8 +564,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= -golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= -golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -605,8 +605,8 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY= -golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -650,8 +650,8 @@ golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= -golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= -golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -678,10 +678,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -743,15 +741,15 @@ golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= -golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= +golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -764,8 +762,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -826,8 +824,8 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ= -golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/backend/helpers/pluginhelper/api/api_collector_stateful.go b/backend/helpers/pluginhelper/api/api_collector_stateful.go index 09c1032a7..f4e2c67fe 100644 --- a/backend/helpers/pluginhelper/api/api_collector_stateful.go +++ b/backend/helpers/pluginhelper/api/api_collector_stateful.go @@ -70,7 +70,19 @@ func (m *StatefulApiCollector) InitCollector(args ApiCollectorArgs) errors.Error // InitGraphQLCollector appends a new GraphQL collector to the list func (m *StatefulApiCollector) InitGraphQLCollector(args GraphqlCollectorArgs) errors.Error { args.RawDataSubTaskArgs = m.RawDataSubTaskArgs - args.Incremental = m.CollectorStateManager.IsIncremental() + // highest priority: caller may hardcode the incremental flag. + // e.g. github graphql pr_collector need to refetch OPENing PRs existing in the database + if !args.Incremental { + // medium priority: force incremental flag to false when full sync is enabled + syncPolicy := args.Ctx.TaskContext().SyncPolicy() + if syncPolicy != nil && syncPolicy.FullSync { + args.Incremental = false + } else { + // lowest priority: use the incremental flag from the state manager + args.Incremental = m.CollectorStateManager.IsIncremental() + } + } + graphqlCollector, err := NewGraphqlCollector(args) if err != nil { return err diff --git a/backend/helpers/pluginhelper/api/graphql_collector.go b/backend/helpers/pluginhelper/api/graphql_collector.go index f1d74a7cb..e83435e9b 100644 --- a/backend/helpers/pluginhelper/api/graphql_collector.go +++ b/backend/helpers/pluginhelper/api/graphql_collector.go @@ -138,14 +138,8 @@ func (collector *GraphqlCollector) Execute() errors.Error { if err != nil { return errors.Default.Wrap(err, "error running auto-migrate") } - - isIncremental := collector.args.Incremental - syncPolicy := collector.args.Ctx.TaskContext().SyncPolicy() - if syncPolicy != nil && syncPolicy.FullSync { - isIncremental = false - } // flush data if not incremental collection - if !isIncremental { + if !collector.args.Incremental { err = db.Delete(&RawData{}, dal.From(collector.table), dal.Where("params = ?", collector.params)) if err != nil { return errors.Default.Wrap(err, "error deleting data from collector") diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go b/backend/plugins/github_graphql/tasks/pr_collector.go index 9e457e5b9..d1bf24115 100644 --- a/backend/plugins/github_graphql/tasks/pr_collector.go +++ b/backend/plugins/github_graphql/tasks/pr_collector.go @@ -19,26 +19,26 @@ package tasks import ( "encoding/json" + "reflect" "strings" "time" + "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/plugin" "github.com/apache/incubator-devlake/helpers/pluginhelper/api" + "github.com/apache/incubator-devlake/plugins/github/models" "github.com/apache/incubator-devlake/plugins/github/tasks" "github.com/merico-dev/graphql" ) const RAW_PRS_TABLE = "github_graphql_prs" +// GraphqlQueryPrWrapper is a wrapper for collecting new PRs since the previous collection type GraphqlQueryPrWrapper struct { RateLimit struct { Cost int } - // now it orderBy UPDATED_AT and use cursor pagination - // It may miss some PRs updated when collection. - // Because these missed PRs will be collected on next, But it's not enough. - // So Next Millstone(0.17) we should change it to filter by CREATE_AT + collect detail Repository struct { PullRequests struct { PageInfo *api.GraphqlQueryPageInfo @@ -48,6 +48,16 @@ type GraphqlQueryPrWrapper struct { } `graphql:"repository(owner: $owner, name: $name)"` } +// GraphqlQueryPrDetailWrapper is a wrapper for refetching OPEN PRs from the database to update the details +type GraphqlQueryPrDetailWrapper struct { + RateLimit struct { + Cost int + } + Repository struct { + PullRequests []GraphqlQueryPr `graphql:"pullRequest(number: $number)" graphql-extend:"true"` + } `graphql:"repository(owner: $owner, name: $name)"` +} + type GraphqlQueryPr struct { DatabaseId int Number int @@ -212,5 +222,60 @@ func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error { return err } + db := taskCtx.GetDal() + cursor, err := db.Cursor( + dal.From(models.GithubPullRequest{}.TableName()), + dal.Where("state = ? AND repo_id = ? AND connection_id=?", "OPEN", data.Options.GithubId, data.Options.ConnectionId), + ) + if err != nil { + return err + } + iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.GithubPullRequest{})) + if err != nil { + return err + } + prUpdated := make(map[int]time.Time) + err = apiCollector.InitGraphQLCollector(api.GraphqlCollectorArgs{ + GraphqlClient: data.GraphqlClient, + Input: iterator, + InputStep: 100, + Incremental: true, + BuildQuery: func(reqData *api.GraphqlRequestData) (interface{}, map[string]interface{}, error) { + query := &GraphqlQueryPrDetailWrapper{} + if reqData == nil { + return query, map[string]interface{}{}, nil + } + ownerName := strings.Split(data.Options.Name, "/") + inputPrs := reqData.Input.([]interface{}) + outputPrs := []map[string]interface{}{} + for _, i := range inputPrs { + inputPr := i.(*models.GithubPullRequest) + outputPrs = append(outputPrs, map[string]interface{}{ + `number`: graphql.Int(inputPr.Number), + }) + prUpdated[inputPr.Number] = inputPr.GithubUpdatedAt + } + variables := map[string]interface{}{ + "pullRequest": outputPrs, + "owner": graphql.String(ownerName[0]), + "name": graphql.String(ownerName[1]), + } + return query, variables, nil + }, + ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) { + query := queryWrapper.(*GraphqlQueryPrDetailWrapper) + prs := query.Repository.PullRequests + for _, rawL := range prs { + if rawL.UpdatedAt.After(prUpdated[rawL.Number]) { + messages = append(messages, errors.Must1(json.Marshal(rawL))) + } + } + return + }, + }) + if err != nil { + return err + } + return apiCollector.Execute() }
