This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 4b418b9cc Fix Pagination Bug in Github GraphQL Job Collector and add
configurable page and batch size (#8616)
4b418b9cc is described below
commit 4b418b9cc638c21db0f6ff47de15f0ecf4d31aa6
Author: FlomoN <[email protected]>
AuthorDate: Mon Oct 20 08:43:19 2025 +0200
Fix Pagination Bug in Github GraphQL Job Collector and add configurable
page and batch size (#8616)
* fix: split pagination and batching for github graphql job_collector task
* feat: github graphql job collection mode, page size, batch size
configurable via Environment Variables
---
.../plugins/github_graphql/tasks/job_collector.go | 242 +++++++++++++++++----
1 file changed, 200 insertions(+), 42 deletions(-)
diff --git a/backend/plugins/github_graphql/tasks/job_collector.go
b/backend/plugins/github_graphql/tasks/job_collector.go
index c67d6a879..f0deb7b41 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -34,13 +34,101 @@ import (
const RAW_GRAPHQL_JOBS_TABLE = "github_graphql_jobs"
-type GraphqlQueryCheckRunWrapper struct {
+// Collection mode configuration
+const (
+ JOB_COLLECTION_MODE_BATCHING = "BATCHING"
+ JOB_COLLECTION_MODE_PAGINATING = "PAGINATING"
+)
+
+// Set the collection mode here
+// BATCHING: Query multiple runs at once, no pagination (may miss jobs if >20
per run)
+// PAGINATING: Query one run at a time with full pagination (complete data,
more API calls)
+const DEFAULT_JOB_COLLECTION_MODE = JOB_COLLECTION_MODE_BATCHING
+
+// Mode-specific configuration
+const (
+ DEFAULT_BATCHING_INPUT_STEP = 10 // Number of runs per request in
BATCHING mode (must be > 1)
+ DEFAULT_BATCHING_PAGE_SIZE = 20 // Jobs per run in BATCHING mode (no
pagination)
+ PAGINATING_INPUT_STEP = 1 // Number of runs per request in
PAGINATING mode (always 1)
+ DEFAULT_PAGINATING_PAGE_SIZE = 50 // Jobs per page in PAGINATING mode
(with pagination)
+)
+
+// JobCollectionConfig holds the configuration for job collection
+type JobCollectionConfig struct {
+ Mode string
+ PageSize int
+ InputStep int
+ BatchingInputStep int
+ BatchingPageSize int
+ PaginatingPageSize int
+}
+
+// getJobCollectionConfig reads configuration from environment variables with
fallback to defaults
+func getJobCollectionConfig(taskCtx plugin.SubTaskContext)
*JobCollectionConfig {
+ cfg := taskCtx.TaskContext().GetConfigReader()
+
+ config := &JobCollectionConfig{
+ Mode: DEFAULT_JOB_COLLECTION_MODE,
+ BatchingInputStep: DEFAULT_BATCHING_INPUT_STEP,
+ BatchingPageSize: DEFAULT_BATCHING_PAGE_SIZE,
+ PaginatingPageSize: DEFAULT_PAGINATING_PAGE_SIZE,
+ }
+
+ // Read collection mode from environment
+ if mode :=
taskCtx.TaskContext().GetConfig("GITHUB_GRAPHQL_JOB_COLLECTION_MODE"); mode !=
"" {
+ if mode == JOB_COLLECTION_MODE_BATCHING || mode ==
JOB_COLLECTION_MODE_PAGINATING {
+ config.Mode = mode
+ }
+ }
+
+ // Read batching input step (must be > 1)
+ if cfg.IsSet("GITHUB_GRAPHQL_JOB_BATCHING_INPUT_STEP") {
+ if step :=
cfg.GetInt("GITHUB_GRAPHQL_JOB_BATCHING_INPUT_STEP"); step > 1 {
+ config.BatchingInputStep = step
+ }
+ }
+
+ // Read page sizes
+ if cfg.IsSet("GITHUB_GRAPHQL_JOB_BATCHING_PAGE_SIZE") {
+ if size := cfg.GetInt("GITHUB_GRAPHQL_JOB_BATCHING_PAGE_SIZE");
size > 0 {
+ config.BatchingPageSize = size
+ }
+ }
+
+ if cfg.IsSet("GITHUB_GRAPHQL_JOB_PAGINATING_PAGE_SIZE") {
+ if size :=
cfg.GetInt("GITHUB_GRAPHQL_JOB_PAGINATING_PAGE_SIZE"); size > 0 {
+ config.PaginatingPageSize = size
+ }
+ }
+
+ // Set derived values based on mode
+ if config.Mode == JOB_COLLECTION_MODE_PAGINATING {
+ config.PageSize = config.PaginatingPageSize
+ config.InputStep = PAGINATING_INPUT_STEP // Always 1 for
paginating
+ } else {
+ config.PageSize = config.BatchingPageSize
+ config.InputStep = config.BatchingInputStep //
User-configurable for batching
+ }
+
+ return config
+}
+
+// Batch mode: query multiple runs at once (array of nodes)
+type GraphqlQueryCheckRunWrapperBatch struct {
RateLimit struct {
Cost int
}
Node []GraphqlQueryCheckSuite `graphql:"node(id: $id)"
graphql-extend:"true"`
}
+// Paginating mode: query single run (single node)
+type GraphqlQueryCheckRunWrapperSingle struct {
+ RateLimit struct {
+ Cost int
+ }
+ Node GraphqlQueryCheckSuite `graphql:"node(id: $id)"`
+}
+
type GraphqlQueryCheckSuite struct {
Id string
Typename string `graphql:"__typename"`
@@ -111,47 +199,81 @@ var CollectJobsMeta = plugin.SubTaskMeta{
var _ plugin.SubTaskEntryPoint = CollectJobs
-func getPageInfo(query interface{}, args *helper.GraphqlCollectorArgs)
(*helper.GraphqlQueryPageInfo, error) {
- queryWrapper := query.(*GraphqlQueryCheckRunWrapper)
- hasNextPage := false
- endCursor := ""
- for _, node := range queryWrapper.Node {
- if node.CheckSuite.CheckRuns.PageInfo.HasNextPage {
- hasNextPage = true
- endCursor = node.CheckSuite.CheckRuns.PageInfo.EndCursor
- break
+// createGetPageInfoFunc returns the appropriate page info function based on
collection mode
+func createGetPageInfoFunc(mode string) func(interface{},
*helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) {
+ if mode == JOB_COLLECTION_MODE_PAGINATING {
+ // PAGINATING mode: supports full pagination
+ return func(query interface{}, args
*helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) {
+ queryWrapper :=
query.(*GraphqlQueryCheckRunWrapperSingle)
+ return &helper.GraphqlQueryPageInfo{
+ EndCursor:
queryWrapper.Node.CheckSuite.CheckRuns.PageInfo.EndCursor,
+ HasNextPage:
queryWrapper.Node.CheckSuite.CheckRuns.PageInfo.HasNextPage,
+ }, nil
}
}
- return &helper.GraphqlQueryPageInfo{
- EndCursor: endCursor,
- HasNextPage: hasNextPage,
- }, nil
+
+ // BATCHING mode: no pagination support
+ return func(query interface{}, args *helper.GraphqlCollectorArgs)
(*helper.GraphqlQueryPageInfo, error) {
+ return &helper.GraphqlQueryPageInfo{
+ EndCursor: "",
+ HasNextPage: false,
+ }, nil
+ }
}
-func buildQuery(reqData *helper.GraphqlRequestData) (interface{},
map[string]interface{}, error) {
- query := &GraphqlQueryCheckRunWrapper{}
- if reqData == nil {
- return query, map[string]interface{}{}, nil
- }
- workflowRuns := reqData.Input.([]interface{})
- checkSuiteIds := []map[string]interface{}{}
- for _, iWorkflowRuns := range workflowRuns {
- workflowRun := iWorkflowRuns.(*SimpleWorkflowRun)
- checkSuiteIds = append(checkSuiteIds, map[string]interface{}{
- `id`: graphql.ID(workflowRun.CheckSuiteNodeID),
- })
- }
- variables := map[string]interface{}{
- "node": checkSuiteIds,
- "pageSize": graphql.Int(reqData.Pager.Size),
- "skipCursor": (*graphql.String)(reqData.Pager.SkipCursor),
- }
- return query, variables, nil
+// createBuildQueryFunc returns the appropriate build query function based on
collection mode
+func createBuildQueryFunc(mode string) func(*helper.GraphqlRequestData)
(interface{}, map[string]interface{}, error) {
+ if mode == JOB_COLLECTION_MODE_PAGINATING {
+ // PAGINATING mode: single run per request
+ return func(reqData *helper.GraphqlRequestData) (interface{},
map[string]interface{}, error) {
+ if reqData == nil {
+ return &GraphqlQueryCheckRunWrapperSingle{},
map[string]interface{}{}, nil
+ }
+
+ workflowRun := reqData.Input.(*SimpleWorkflowRun)
+ query := &GraphqlQueryCheckRunWrapperSingle{}
+ variables := map[string]interface{}{
+ "id":
graphql.ID(workflowRun.CheckSuiteNodeID),
+ "pageSize": graphql.Int(reqData.Pager.Size),
+ "skipCursor":
(*graphql.String)(reqData.Pager.SkipCursor),
+ }
+ return query, variables, nil
+ }
+ }
+
+ // BATCHING mode: multiple runs per request
+ return func(reqData *helper.GraphqlRequestData) (interface{},
map[string]interface{}, error) {
+ if reqData == nil {
+ return &GraphqlQueryCheckRunWrapperBatch{},
map[string]interface{}{}, nil
+ }
+
+ workflowRuns := reqData.Input.([]interface{})
+ query := &GraphqlQueryCheckRunWrapperBatch{}
+ checkSuiteIds := []map[string]interface{}{}
+ for _, iWorkflowRuns := range workflowRuns {
+ workflowRun := iWorkflowRuns.(*SimpleWorkflowRun)
+ checkSuiteIds = append(checkSuiteIds,
map[string]interface{}{
+ `id`: graphql.ID(workflowRun.CheckSuiteNodeID),
+ })
+ }
+ variables := map[string]interface{}{
+ "node": checkSuiteIds,
+ "pageSize": graphql.Int(reqData.Pager.Size),
+ "skipCursor":
(*graphql.String)(reqData.Pager.SkipCursor),
+ }
+ return query, variables, nil
+ }
}
func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*githubTasks.GithubTaskData)
+ logger := taskCtx.GetLogger()
+
+ // Get configuration from environment variables or defaults
+ config := getJobCollectionConfig(taskCtx)
+ logger.Info("GitHub Job Collector - Mode: %s, InputStep: %d, PageSize:
%d",
+ config.Mode, config.InputStep, config.PageSize)
apiCollector, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
Ctx: taskCtx,
@@ -175,28 +297,40 @@ func CollectJobs(taskCtx plugin.SubTaskContext)
errors.Error {
clauses = append(clauses, dal.Where("github_updated_at > ?",
*apiCollector.GetSince()))
}
- cursor, err := db.Cursor(
- clauses...,
- )
+ cursor, err := db.Cursor(clauses...)
if err != nil {
return err
}
defer cursor.Close()
+
iterator, err := helper.NewDalCursorIterator(db, cursor,
reflect.TypeOf(SimpleWorkflowRun{}))
if err != nil {
return err
}
+ // Create closures that capture the runtime mode configuration
+ buildQueryFunc := createBuildQueryFunc(config.Mode)
+ var getPageInfoFunc func(interface{}, *helper.GraphqlCollectorArgs)
(*helper.GraphqlQueryPageInfo, error)
+
+ if config.Mode == JOB_COLLECTION_MODE_PAGINATING {
+ getPageInfoFunc = createGetPageInfoFunc(config.Mode) // Enable
pagination
+ } else {
+ getPageInfoFunc = nil // Disable pagination for BATCHING mode
+ }
+
err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{
Input: iterator,
- InputStep: 10,
+ InputStep: config.InputStep,
GraphqlClient: data.GraphqlClient,
- BuildQuery: buildQuery,
- GetPageInfo: getPageInfo,
+ BuildQuery: buildQueryFunc,
+ GetPageInfo: getPageInfoFunc, // nil for BATCHING, function
for PAGINATING
ResponseParser: func(queryWrapper any) (messages
[]json.RawMessage, err errors.Error) {
- query := queryWrapper.(*GraphqlQueryCheckRunWrapper)
- for _, node := range query.Node {
+ if config.Mode == JOB_COLLECTION_MODE_PAGINATING {
+ // Single node processing
+ query :=
queryWrapper.(*GraphqlQueryCheckRunWrapperSingle)
+ node := query.Node
runId := node.CheckSuite.WorkflowRun.DatabaseId
+
for _, checkRun := range
node.CheckSuite.CheckRuns.Nodes {
dbCheckRun := &DbCheckRun{
RunId: runId,
@@ -215,11 +349,35 @@ func CollectJobs(taskCtx plugin.SubTaskContext)
errors.Error {
}
messages = append(messages,
errors.Must1(json.Marshal(dbCheckRun)))
}
+ } else {
+ // Batch processing (multiple nodes)
+ query :=
queryWrapper.(*GraphqlQueryCheckRunWrapperBatch)
+ for _, node := range query.Node {
+ runId :=
node.CheckSuite.WorkflowRun.DatabaseId
+ for _, checkRun := range
node.CheckSuite.CheckRuns.Nodes {
+ dbCheckRun := &DbCheckRun{
+ RunId:
runId,
+ GraphqlQueryCheckRun:
&checkRun,
+ }
+ // A checkRun without a
startedAt time is a run that was never started (skipped), GitHub returns
+ // a ZeroTime (Due to the GO
implementation) for startedAt, so we need to check for that here.
+ dbCheckRun.StartedAt =
utils.NilIfZeroTime(dbCheckRun.StartedAt)
+ dbCheckRun.CompletedAt =
utils.NilIfZeroTime(dbCheckRun.CompletedAt)
+ updatedAt :=
dbCheckRun.StartedAt
+ if dbCheckRun.CompletedAt !=
nil {
+ updatedAt =
dbCheckRun.CompletedAt
+ }
+ if apiCollector.GetSince() !=
nil && !apiCollector.GetSince().Before(*updatedAt) {
+ return messages,
helper.ErrFinishCollect
+ }
+ messages = append(messages,
errors.Must1(json.Marshal(dbCheckRun)))
+ }
+ }
}
return
},
IgnoreQueryErrors: true,
- PageSize: 20,
+ PageSize: config.PageSize,
})
if err != nil {
return err