This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b418b9cc Fix Pagination Bug in Github GraphQL Job Collector and add 
configurable page and batch size (#8616)
4b418b9cc is described below

commit 4b418b9cc638c21db0f6ff47de15f0ecf4d31aa6
Author: FlomoN <[email protected]>
AuthorDate: Mon Oct 20 08:43:19 2025 +0200

    Fix Pagination Bug in Github GraphQL Job Collector and add configurable 
page and batch size (#8616)
    
    * fix: split pagination and batching for github graphql job_collector task
    
    * feat: github graphql job collection mode, page size, batch size 
configurable via Environment Variables
---
 .../plugins/github_graphql/tasks/job_collector.go  | 242 +++++++++++++++++----
 1 file changed, 200 insertions(+), 42 deletions(-)

diff --git a/backend/plugins/github_graphql/tasks/job_collector.go 
b/backend/plugins/github_graphql/tasks/job_collector.go
index c67d6a879..f0deb7b41 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -34,13 +34,101 @@ import (
 
 const RAW_GRAPHQL_JOBS_TABLE = "github_graphql_jobs"
 
-type GraphqlQueryCheckRunWrapper struct {
+// Collection mode configuration
+const (
+       JOB_COLLECTION_MODE_BATCHING   = "BATCHING"
+       JOB_COLLECTION_MODE_PAGINATING = "PAGINATING"
+)
+
+// Set the collection mode here
+// BATCHING: Query multiple runs at once, no pagination (may miss jobs if >20 
per run)
+// PAGINATING: Query one run at a time with full pagination (complete data, 
more API calls)
+const DEFAULT_JOB_COLLECTION_MODE = JOB_COLLECTION_MODE_BATCHING
+
+// Mode-specific configuration
+const (
+       DEFAULT_BATCHING_INPUT_STEP  = 10 // Number of runs per request in 
BATCHING mode (must be > 1)
+       DEFAULT_BATCHING_PAGE_SIZE   = 20 // Jobs per run in BATCHING mode (no 
pagination)
+       PAGINATING_INPUT_STEP        = 1  // Number of runs per request in 
PAGINATING mode (always 1)
+       DEFAULT_PAGINATING_PAGE_SIZE = 50 // Jobs per page in PAGINATING mode 
(with pagination)
+)
+
+// JobCollectionConfig holds the configuration for job collection
+type JobCollectionConfig struct {
+       Mode               string
+       PageSize           int
+       InputStep          int
+       BatchingInputStep  int
+       BatchingPageSize   int
+       PaginatingPageSize int
+}
+
+// getJobCollectionConfig reads configuration from environment variables with 
fallback to defaults
+func getJobCollectionConfig(taskCtx plugin.SubTaskContext) 
*JobCollectionConfig {
+       cfg := taskCtx.TaskContext().GetConfigReader()
+
+       config := &JobCollectionConfig{
+               Mode:               DEFAULT_JOB_COLLECTION_MODE,
+               BatchingInputStep:  DEFAULT_BATCHING_INPUT_STEP,
+               BatchingPageSize:   DEFAULT_BATCHING_PAGE_SIZE,
+               PaginatingPageSize: DEFAULT_PAGINATING_PAGE_SIZE,
+       }
+
+       // Read collection mode from environment
+       if mode := 
taskCtx.TaskContext().GetConfig("GITHUB_GRAPHQL_JOB_COLLECTION_MODE"); mode != 
"" {
+               if mode == JOB_COLLECTION_MODE_BATCHING || mode == 
JOB_COLLECTION_MODE_PAGINATING {
+                       config.Mode = mode
+               }
+       }
+
+       // Read batching input step (must be > 1)
+       if cfg.IsSet("GITHUB_GRAPHQL_JOB_BATCHING_INPUT_STEP") {
+               if step := 
cfg.GetInt("GITHUB_GRAPHQL_JOB_BATCHING_INPUT_STEP"); step > 1 {
+                       config.BatchingInputStep = step
+               }
+       }
+
+       // Read page sizes
+       if cfg.IsSet("GITHUB_GRAPHQL_JOB_BATCHING_PAGE_SIZE") {
+               if size := cfg.GetInt("GITHUB_GRAPHQL_JOB_BATCHING_PAGE_SIZE"); 
size > 0 {
+                       config.BatchingPageSize = size
+               }
+       }
+
+       if cfg.IsSet("GITHUB_GRAPHQL_JOB_PAGINATING_PAGE_SIZE") {
+               if size := 
cfg.GetInt("GITHUB_GRAPHQL_JOB_PAGINATING_PAGE_SIZE"); size > 0 {
+                       config.PaginatingPageSize = size
+               }
+       }
+
+       // Set derived values based on mode
+       if config.Mode == JOB_COLLECTION_MODE_PAGINATING {
+               config.PageSize = config.PaginatingPageSize
+               config.InputStep = PAGINATING_INPUT_STEP // Always 1 for 
paginating
+       } else {
+               config.PageSize = config.BatchingPageSize
+               config.InputStep = config.BatchingInputStep // 
User-configurable for batching
+       }
+
+       return config
+}
+
+// Batch mode: query multiple runs at once (array of nodes)
+type GraphqlQueryCheckRunWrapperBatch struct {
        RateLimit struct {
                Cost int
        }
        Node []GraphqlQueryCheckSuite `graphql:"node(id: $id)" 
graphql-extend:"true"`
 }
 
+// Paginating mode: query single run (single node)
+type GraphqlQueryCheckRunWrapperSingle struct {
+       RateLimit struct {
+               Cost int
+       }
+       Node GraphqlQueryCheckSuite `graphql:"node(id: $id)"`
+}
+
 type GraphqlQueryCheckSuite struct {
        Id       string
        Typename string `graphql:"__typename"`
@@ -111,47 +199,81 @@ var CollectJobsMeta = plugin.SubTaskMeta{
 
 var _ plugin.SubTaskEntryPoint = CollectJobs
 
-func getPageInfo(query interface{}, args *helper.GraphqlCollectorArgs) 
(*helper.GraphqlQueryPageInfo, error) {
-       queryWrapper := query.(*GraphqlQueryCheckRunWrapper)
-       hasNextPage := false
-       endCursor := ""
-       for _, node := range queryWrapper.Node {
-               if node.CheckSuite.CheckRuns.PageInfo.HasNextPage {
-                       hasNextPage = true
-                       endCursor = node.CheckSuite.CheckRuns.PageInfo.EndCursor
-                       break
+// createGetPageInfoFunc returns the appropriate page info function based on 
collection mode
+func createGetPageInfoFunc(mode string) func(interface{}, 
*helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) {
+       if mode == JOB_COLLECTION_MODE_PAGINATING {
+               // PAGINATING mode: supports full pagination
+               return func(query interface{}, args 
*helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) {
+                       queryWrapper := 
query.(*GraphqlQueryCheckRunWrapperSingle)
+                       return &helper.GraphqlQueryPageInfo{
+                               EndCursor:   
queryWrapper.Node.CheckSuite.CheckRuns.PageInfo.EndCursor,
+                               HasNextPage: 
queryWrapper.Node.CheckSuite.CheckRuns.PageInfo.HasNextPage,
+                       }, nil
                }
        }
-       return &helper.GraphqlQueryPageInfo{
-               EndCursor:   endCursor,
-               HasNextPage: hasNextPage,
-       }, nil
+
+       // BATCHING mode: no pagination support
+       return func(query interface{}, args *helper.GraphqlCollectorArgs) 
(*helper.GraphqlQueryPageInfo, error) {
+               return &helper.GraphqlQueryPageInfo{
+                       EndCursor:   "",
+                       HasNextPage: false,
+               }, nil
+       }
 }
 
-func buildQuery(reqData *helper.GraphqlRequestData) (interface{}, 
map[string]interface{}, error) {
-       query := &GraphqlQueryCheckRunWrapper{}
-       if reqData == nil {
-               return query, map[string]interface{}{}, nil
-       }
-       workflowRuns := reqData.Input.([]interface{})
-       checkSuiteIds := []map[string]interface{}{}
-       for _, iWorkflowRuns := range workflowRuns {
-               workflowRun := iWorkflowRuns.(*SimpleWorkflowRun)
-               checkSuiteIds = append(checkSuiteIds, map[string]interface{}{
-                       `id`: graphql.ID(workflowRun.CheckSuiteNodeID),
-               })
-       }
-       variables := map[string]interface{}{
-               "node":       checkSuiteIds,
-               "pageSize":   graphql.Int(reqData.Pager.Size),
-               "skipCursor": (*graphql.String)(reqData.Pager.SkipCursor),
-       }
-       return query, variables, nil
+// createBuildQueryFunc returns the appropriate build query function based on 
collection mode
+func createBuildQueryFunc(mode string) func(*helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
+       if mode == JOB_COLLECTION_MODE_PAGINATING {
+               // PAGINATING mode: single run per request
+               return func(reqData *helper.GraphqlRequestData) (interface{}, 
map[string]interface{}, error) {
+                       if reqData == nil {
+                               return &GraphqlQueryCheckRunWrapperSingle{}, 
map[string]interface{}{}, nil
+                       }
+
+                       workflowRun := reqData.Input.(*SimpleWorkflowRun)
+                       query := &GraphqlQueryCheckRunWrapperSingle{}
+                       variables := map[string]interface{}{
+                               "id":         
graphql.ID(workflowRun.CheckSuiteNodeID),
+                               "pageSize":   graphql.Int(reqData.Pager.Size),
+                               "skipCursor": 
(*graphql.String)(reqData.Pager.SkipCursor),
+                       }
+                       return query, variables, nil
+               }
+       }
+
+       // BATCHING mode: multiple runs per request
+       return func(reqData *helper.GraphqlRequestData) (interface{}, 
map[string]interface{}, error) {
+               if reqData == nil {
+                       return &GraphqlQueryCheckRunWrapperBatch{}, 
map[string]interface{}{}, nil
+               }
+
+               workflowRuns := reqData.Input.([]interface{})
+               query := &GraphqlQueryCheckRunWrapperBatch{}
+               checkSuiteIds := []map[string]interface{}{}
+               for _, iWorkflowRuns := range workflowRuns {
+                       workflowRun := iWorkflowRuns.(*SimpleWorkflowRun)
+                       checkSuiteIds = append(checkSuiteIds, 
map[string]interface{}{
+                               `id`: graphql.ID(workflowRun.CheckSuiteNodeID),
+                       })
+               }
+               variables := map[string]interface{}{
+                       "node":       checkSuiteIds,
+                       "pageSize":   graphql.Int(reqData.Pager.Size),
+                       "skipCursor": 
(*graphql.String)(reqData.Pager.SkipCursor),
+               }
+               return query, variables, nil
+       }
 }
 
 func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*githubTasks.GithubTaskData)
+       logger := taskCtx.GetLogger()
+
+       // Get configuration from environment variables or defaults
+       config := getJobCollectionConfig(taskCtx)
+       logger.Info("GitHub Job Collector - Mode: %s, InputStep: %d, PageSize: 
%d",
+               config.Mode, config.InputStep, config.PageSize)
 
        apiCollector, err := 
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
                Ctx: taskCtx,
@@ -175,28 +297,40 @@ func CollectJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
                clauses = append(clauses, dal.Where("github_updated_at > ?", 
*apiCollector.GetSince()))
        }
 
-       cursor, err := db.Cursor(
-               clauses...,
-       )
+       cursor, err := db.Cursor(clauses...)
        if err != nil {
                return err
        }
        defer cursor.Close()
+
        iterator, err := helper.NewDalCursorIterator(db, cursor, 
reflect.TypeOf(SimpleWorkflowRun{}))
        if err != nil {
                return err
        }
 
+       // Create closures that capture the runtime mode configuration
+       buildQueryFunc := createBuildQueryFunc(config.Mode)
+       var getPageInfoFunc func(interface{}, *helper.GraphqlCollectorArgs) 
(*helper.GraphqlQueryPageInfo, error)
+
+       if config.Mode == JOB_COLLECTION_MODE_PAGINATING {
+               getPageInfoFunc = createGetPageInfoFunc(config.Mode) // Enable 
pagination
+       } else {
+               getPageInfoFunc = nil // Disable pagination for BATCHING mode
+       }
+
        err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{
                Input:         iterator,
-               InputStep:     10,
+               InputStep:     config.InputStep,
                GraphqlClient: data.GraphqlClient,
-               BuildQuery:    buildQuery,
-               GetPageInfo:   getPageInfo,
+               BuildQuery:    buildQueryFunc,
+               GetPageInfo:   getPageInfoFunc, // nil for BATCHING, function 
for PAGINATING
                ResponseParser: func(queryWrapper any) (messages 
[]json.RawMessage, err errors.Error) {
-                       query := queryWrapper.(*GraphqlQueryCheckRunWrapper)
-                       for _, node := range query.Node {
+                       if config.Mode == JOB_COLLECTION_MODE_PAGINATING {
+                               // Single node processing
+                               query := 
queryWrapper.(*GraphqlQueryCheckRunWrapperSingle)
+                               node := query.Node
                                runId := node.CheckSuite.WorkflowRun.DatabaseId
+
                                for _, checkRun := range 
node.CheckSuite.CheckRuns.Nodes {
                                        dbCheckRun := &DbCheckRun{
                                                RunId:                runId,
@@ -215,11 +349,35 @@ func CollectJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
                                        }
                                        messages = append(messages, 
errors.Must1(json.Marshal(dbCheckRun)))
                                }
+                       } else {
+                               // Batch processing (multiple nodes)
+                               query := 
queryWrapper.(*GraphqlQueryCheckRunWrapperBatch)
+                               for _, node := range query.Node {
+                                       runId := 
node.CheckSuite.WorkflowRun.DatabaseId
+                                       for _, checkRun := range 
node.CheckSuite.CheckRuns.Nodes {
+                                               dbCheckRun := &DbCheckRun{
+                                                       RunId:                
runId,
+                                                       GraphqlQueryCheckRun: 
&checkRun,
+                                               }
+                                               // A checkRun without a 
startedAt time is a run that was never started (skipped), GitHub returns
+                                               // a ZeroTime (Due to the GO 
implementation) for startedAt, so we need to check for that here.
+                                               dbCheckRun.StartedAt = 
utils.NilIfZeroTime(dbCheckRun.StartedAt)
+                                               dbCheckRun.CompletedAt = 
utils.NilIfZeroTime(dbCheckRun.CompletedAt)
+                                               updatedAt := 
dbCheckRun.StartedAt
+                                               if dbCheckRun.CompletedAt != 
nil {
+                                                       updatedAt = 
dbCheckRun.CompletedAt
+                                               }
+                                               if apiCollector.GetSince() != 
nil && !apiCollector.GetSince().Before(*updatedAt) {
+                                                       return messages, 
helper.ErrFinishCollect
+                                               }
+                                               messages = append(messages, 
errors.Must1(json.Marshal(dbCheckRun)))
+                                       }
+                               }
                        }
                        return
                },
                IgnoreQueryErrors: true,
-               PageSize:          20,
+               PageSize:          config.PageSize,
        })
        if err != nil {
                return err

Reply via email to