This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 80ec49b55 feat(circleci): Cherry Pick #7820 #7986 (#8341)
80ec49b55 is described below

commit 80ec49b55820fc6ac7f7a9cd29828c9d68810295
Author: John Ibsen <[email protected]>
AuthorDate: Tue Mar 18 21:36:00 2025 -0400

    feat(circleci): Cherry Pick #7820 #7986 (#8341)
    
    * feat(circleci-plugin): incremental data collection (#7986)
    
    * feat(api_collector_stateful): handle case were response has records from 
both before & after createdAfter of last collection
    
    * feat(circleci-plugin): incremental collection for pipelines
    
    * feat(api_collector_stateful): expose Input collector arg for 
StatefulFinalizableEntity to collect data based on previous collection
    
    * feat(circleci-plugin): incremental data collection for workflows
    
    * feat(circleci-plugin): incremental data collection for jobs
    
    * refactor(circleci-plugin): use common query param function
    
    * refactor(circleci-plugin): use BuildQueryParamsWithPageToken func when 
collecting unfinished job details
    
    * fix(circleci-plugin): pipeline collector time range (#7820)
    
    * fix(circleci-plugin): only collect pipelines from after data time range
    
    * fix(circleci-plugin): ignore 404 not found when collecting jobs or 
workflows
    
    * Cleanup from bad merge
    
    ---------
    
    Co-authored-by: Nick Williams <[email protected]>
    Co-authored-by: John Ibsen <[email protected]>
---
 .../pluginhelper/api/api_collector_stateful.go     | 38 +++++++++-
 backend/plugins/circleci/tasks/job_collector.go    | 87 ++++++++++++++++------
 .../plugins/circleci/tasks/pipeline_collector.go   | 45 +++++++++--
 backend/plugins/circleci/tasks/shared.go           | 27 ++++++-
 .../plugins/circleci/tasks/workflow_collector.go   | 84 +++++++++++++++------
 5 files changed, 220 insertions(+), 61 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/api_collector_stateful.go 
b/backend/helpers/pluginhelper/api/api_collector_stateful.go
index f4e2c67fe..985392f7b 100644
--- a/backend/helpers/pluginhelper/api/api_collector_stateful.go
+++ b/backend/helpers/pluginhelper/api/api_collector_stateful.go
@@ -139,10 +139,19 @@ func NewStatefulApiCollectorForFinalizableEntity(args 
FinalizableApiCollectorArg
        createdAfter := manager.CollectorStateManager.GetSince()
        isIncremental := manager.CollectorStateManager.IsIncremental()
 
+       var inputIterator Iterator
+       if args.CollectNewRecordsByList.BuildInputIterator != nil {
+               inputIterator, err = 
args.CollectNewRecordsByList.BuildInputIterator(isIncremental, createdAfter)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
        // step 1: create a collector to collect newly added records
        err = manager.InitCollector(ApiCollectorArgs{
                ApiClient: args.ApiClient,
                // common
+               Input:       inputIterator,
                Incremental: isIncremental,
                UrlTemplate: args.CollectNewRecordsByList.UrlTemplate,
                Query: func(reqData *RequestData) (url.Values, errors.Error) {
@@ -169,21 +178,41 @@ func NewStatefulApiCollectorForFinalizableEntity(args 
FinalizableApiCollectorArg
 
                        // time filter or diff sync
                        if createdAfter != nil && 
args.CollectNewRecordsByList.GetCreated != nil {
-                               // if the first record of the page was created 
before createdAfter, return emtpy set and stop
+                               // if the first record of the page was created 
before createdAfter and not a zero value, return empty set and stop
                                firstCreated, err := 
args.CollectNewRecordsByList.GetCreated(items[0])
                                if err != nil {
                                        return nil, err
                                }
-                               if firstCreated.Before(*createdAfter) {
+                               if firstCreated.Before(*createdAfter) && 
!firstCreated.IsZero() {
                                        return nil, ErrFinishCollect
                                }
-                               // if the last record was created before 
createdAfter, return records and stop
+
+                               // If last record was created before 
CreatedAfter, including a zero value, check each record individually
                                lastCreated, err := 
args.CollectNewRecordsByList.GetCreated(items[len(items)-1])
                                if err != nil {
                                        return nil, err
                                }
                                if lastCreated.Before(*createdAfter) {
-                                       return items, ErrFinishCollect
+                                       var validItems []json.RawMessage
+                                       // Only collect items that were created 
after the last successful collection to prevent duplicates
+                                       for _, item := range items {
+                                               itemCreatedAt, err := 
args.CollectNewRecordsByList.GetCreated(item)
+                                               if err != nil {
+                                                       return nil, err
+                                               }
+
+                                               if itemCreatedAt.IsZero() {
+                                                       // If zero then 
timestamp is null on the response - accept as valid for downstream processing
+                                                       validItems = 
append(validItems, item)
+                                                       continue
+                                               }
+
+                                               if 
itemCreatedAt.Before(*createdAfter) {
+                                                       // Once we reach an 
item that was created before the last successful collection, stop & return
+                                                       return validItems, 
ErrFinishCollect
+                                               }
+                                               validItems = append(validItems, 
item)
+                                       }
                                }
                        }
                        return items, err
@@ -267,6 +296,7 @@ type FinalizableApiCollectorListArgs struct {
        Concurrency           int                                               
                                          // required for Undetermined 
Strategy, number of concurrent requests
        GetNextPageCustomData func(prevReqData *RequestData, prevPageResponse 
*http.Response) (interface{}, errors.Error) // required for Sequential 
Strategy, to extract the next page cursor from the given response
        GetTotalPages         func(res *http.Response, args *ApiCollectorArgs) 
(int, errors.Error)                        // required for Determined Strategy, 
to extract the total number of pages from the given response
+       BuildInputIterator    func(isIncremental bool, createdAfter *time.Time) 
(Iterator, errors.Error)
 }
 
 // FinalizableApiCollectorDetailArgs is the arguments for the detail collector
diff --git a/backend/plugins/circleci/tasks/job_collector.go 
b/backend/plugins/circleci/tasks/job_collector.go
index 1f2f858f5..fd1d78286 100644
--- a/backend/plugins/circleci/tasks/job_collector.go
+++ b/backend/plugins/circleci/tasks/job_collector.go
@@ -18,12 +18,15 @@ limitations under the License.
 package tasks
 
 import (
+       "encoding/json"
+       "reflect"
+       "time"
+
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/circleci/models"
-       "reflect"
 )
 
 const RAW_JOB_TABLE = "circleci_api_jobs"
@@ -43,30 +46,68 @@ func CollectJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
        logger := taskCtx.GetLogger()
        logger.Info("collect jobs")
 
-       clauses := []dal.Clause{
-               dal.Select("id, pipeline_id"),
-               dal.From(&models.CircleciWorkflow{}),
-               dal.Where("_tool_circleci_workflows.connection_id = ? and 
_tool_circleci_workflows.project_slug = ? ", data.Options.ConnectionId, 
data.Options.ProjectSlug),
-       }
+       collector, err := 
api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
+               RawDataSubTaskArgs: *rawDataSubTaskArgs,
+               ApiClient:          data.ApiClient,
+               CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
+                       PageSize:              int(data.Options.PageSize),
+                       GetNextPageCustomData: ExtractNextPageToken,
+                       BuildInputIterator: func(isIncremental bool, 
createdAfter *time.Time) (api.Iterator, errors.Error) {
+                               clauses := []dal.Clause{
+                                       dal.Select("id, pipeline_id"), // 
pipeline_id not on individual job response but required for result
+                                       dal.From(&models.CircleciWorkflow{}),
+                                       dal.Where("connection_id = ? and 
project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug),
+                               }
 
-       db := taskCtx.GetDal()
-       cursor, err := db.Cursor(clauses...)
-       if err != nil {
-               return err
-       }
-       iterator, err := api.NewDalCursorIterator(db, cursor, 
reflect.TypeOf(models.CircleciWorkflow{}))
-       if err != nil {
-               return err
-       }
+                               if isIncremental {
+                                       clauses = append(clauses, 
dal.Where("created_date > ?", createdAfter))
+                               }
+
+                               db := taskCtx.GetDal()
+                               cursor, err := db.Cursor(clauses...)
+                               if err != nil {
+                                       return nil, err
+                               }
+                               return api.NewDalCursorIterator(db, cursor, 
reflect.TypeOf(models.CircleciWorkflow{}))
+                       },
+                       FinalizableApiCollectorCommonArgs: 
api.FinalizableApiCollectorCommonArgs{
+                               UrlTemplate:    "/v2/workflow/{{ .Input.Id 
}}/job",
+                               Query:          BuildQueryParamsWithPageToken,
+                               ResponseParser: ParseCircleciPageTokenResp,
+                               AfterResponse:  ignoreDeletedBuilds, // Ignore 
the 404 response if a workflow has been deleted
+                       },
+                       GetCreated: func(item json.RawMessage) (time.Time, 
errors.Error) {
+                               var job struct { // Individual job response 
lacks created_at field, so have to use started_at
+                                       CreatedAt time.Time `json:"started_at"` 
// This will be null in some cases (e.g. queued, not_running, blocked)
+                               }
+                               if err := json.Unmarshal(item, &job); err != 
nil {
+                                       return time.Time{}, 
errors.Default.Wrap(err, "failed to unmarshal job")
+                               }
+                               return job.CreatedAt, nil
+                       },
+               },
+               CollectUnfinishedDetails: 
&api.FinalizableApiCollectorDetailArgs{
+                       FinalizableApiCollectorCommonArgs: 
api.FinalizableApiCollectorCommonArgs{
+                               UrlTemplate:    "/v2/workflow/{{ .Input.Id 
}}/job", // The individual job endpoint has different fields so need to 
recollect all jobs for a workflow
+                               Query:          BuildQueryParamsWithPageToken,
+                               ResponseParser: ParseCircleciPageTokenResp,
+                               AfterResponse:  ignoreDeletedBuilds,
+                       },
+                       BuildInputIterator: func() (api.Iterator, errors.Error) 
{
+                               clauses := []dal.Clause{
+                                       dal.Select("DISTINCT workflow_id"), // 
Only need to recollect jobs for a workflow once
+                                       dal.From(&models.CircleciJob{}),
+                                       dal.Where("connection_id = ? AND 
project_slug = ? AND status IN ('running', 'not_running', 'queued', 
'on_hold')", data.Options.ConnectionId, data.Options.ProjectSlug),
+                               }
 
-       collector, err := api.NewApiCollector(api.ApiCollectorArgs{
-               RawDataSubTaskArgs:    *rawDataSubTaskArgs,
-               ApiClient:             data.ApiClient,
-               UrlTemplate:           "/v2/workflow/{{ .Input.Id }}/job",
-               Input:                 iterator,
-               GetNextPageCustomData: ExtractNextPageToken,
-               Query:                 BuildQueryParamsWithPageToken,
-               ResponseParser:        ParseCircleciPageTokenResp,
+                               db := taskCtx.GetDal()
+                               cursor, err := db.Cursor(clauses...)
+                               if err != nil {
+                                       return nil, err
+                               }
+                               return api.NewDalCursorIterator(db, cursor, 
reflect.TypeOf(models.CircleciJob{}))
+                       },
+               },
        })
        if err != nil {
                logger.Error(err, "collect jobs error")
diff --git a/backend/plugins/circleci/tasks/pipeline_collector.go 
b/backend/plugins/circleci/tasks/pipeline_collector.go
index b7940e6c8..2dc8da4f3 100644
--- a/backend/plugins/circleci/tasks/pipeline_collector.go
+++ b/backend/plugins/circleci/tasks/pipeline_collector.go
@@ -18,6 +18,9 @@ limitations under the License.
 package tasks
 
 import (
+       "encoding/json"
+       "net/http"
+
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
@@ -38,15 +41,41 @@ var CollectPipelinesMeta = plugin.SubTaskMeta{
 func CollectPipelines(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_PIPELINE_TABLE)
        logger := taskCtx.GetLogger()
+       timeAfter := rawDataSubTaskArgs.Ctx.TaskContext().SyncPolicy().TimeAfter
        logger.Info("collect pipelines")
-       collector, err := api.NewApiCollector(api.ApiCollectorArgs{
-               RawDataSubTaskArgs:    *rawDataSubTaskArgs,
-               ApiClient:             data.ApiClient,
-               UrlTemplate:           "/v2/project/{{ .Params.ProjectSlug 
}}/pipeline",
-               PageSize:              int(data.Options.PageSize),
-               GetNextPageCustomData: ExtractNextPageToken,
-               Query:                 BuildQueryParamsWithPageToken,
-               ResponseParser:        ParseCircleciPageTokenResp,
+       collector, err := 
api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
+               RawDataSubTaskArgs: *rawDataSubTaskArgs,
+               ApiClient:          data.ApiClient,
+               CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
+                       PageSize:              int(data.Options.PageSize),
+                       GetNextPageCustomData: ExtractNextPageToken,
+                       FinalizableApiCollectorCommonArgs: 
api.FinalizableApiCollectorCommonArgs{
+                               UrlTemplate: "/v2/project/{{ 
.Params.ProjectSlug }}/pipeline",
+                               Query:       BuildQueryParamsWithPageToken,
+                               ResponseParser: func(res *http.Response) 
([]json.RawMessage, errors.Error) {
+                                       data := 
CircleciPageTokenResp[[]json.RawMessage]{}
+                                       err := api.UnmarshalResponse(res, &data)
+
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       filteredItems := []json.RawMessage{}
+                                       for _, item := range data.Items {
+                                               pipelineCreatedAt, err := 
extractCreatedAt(item)
+
+                                               if err != nil {
+                                                       return nil, err
+                                               }
+                                               if 
pipelineCreatedAt.Before(*timeAfter) {
+                                                       return filteredItems, 
api.ErrFinishCollect
+                                               }
+                                               filteredItems = 
append(filteredItems, item)
+                                       }
+                                       return filteredItems, nil
+                               },
+                       },
+                       GetCreated: extractCreatedAt,
+               },
        })
        if err != nil {
                logger.Error(err, "collect pipelines error")
diff --git a/backend/plugins/circleci/tasks/shared.go 
b/backend/plugins/circleci/tasks/shared.go
index dd2c4944a..6e235ecd6 100644
--- a/backend/plugins/circleci/tasks/shared.go
+++ b/backend/plugins/circleci/tasks/shared.go
@@ -19,14 +19,16 @@ package tasks
 
 import (
        "encoding/json"
+       "net/http"
+       "net/url"
+       "time"
+
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/models/domainlayer/didgen"
        "github.com/apache/incubator-devlake/core/plugin"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/circleci/models"
-       "net/http"
-       "net/url"
 )
 
 var accountIdGen *didgen.DomainIdGenerator
@@ -107,7 +109,7 @@ func ExtractNextPageToken(prevReqData *api.RequestData, 
prevPageResponse *http.R
        return res.NextPageToken, nil
 }
 
-func BuildQueryParamsWithPageToken(reqData *api.RequestData) (url.Values, 
errors.Error) {
+func BuildQueryParamsWithPageToken(reqData *api.RequestData, _ *time.Time) 
(url.Values, errors.Error) {
        query := url.Values{}
        if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" {
                query.Set("page-token", pageToken)
@@ -120,3 +122,22 @@ func ParseCircleciPageTokenResp(res *http.Response) 
([]json.RawMessage, errors.E
        err := api.UnmarshalResponse(res, &data)
        return data.Items, err
 }
+
+func ignoreDeletedBuilds(res *http.Response) errors.Error {
+       // CircleCI API will return a 404 response for a workflow/job that has 
been deleted
+       // due to their data retention policy. We should ignore these errors.
+       if res.StatusCode == http.StatusNotFound {
+               return api.ErrIgnoreAndContinue
+       }
+       return nil
+}
+
+func extractCreatedAt(item json.RawMessage) (time.Time, errors.Error) {
+       var entity struct {
+               CreatedAt time.Time `json:"created_at"`
+       }
+       if err := json.Unmarshal(item, &entity); err != nil {
+               return time.Time{}, errors.Default.Wrap(err, "failed to 
unmarshal item")
+       }
+       return entity.CreatedAt, nil
+}
diff --git a/backend/plugins/circleci/tasks/workflow_collector.go 
b/backend/plugins/circleci/tasks/workflow_collector.go
index 8234eceac..f0f3aebe5 100644
--- a/backend/plugins/circleci/tasks/workflow_collector.go
+++ b/backend/plugins/circleci/tasks/workflow_collector.go
@@ -18,12 +18,16 @@ limitations under the License.
 package tasks
 
 import (
+       "encoding/json"
+       "net/http"
+       "reflect"
+       "time"
+
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/circleci/models"
-       "reflect"
 )
 
 const RAW_WORKFLOW_TABLE = "circleci_api_workflows"
@@ -43,30 +47,64 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext) 
errors.Error {
        logger := taskCtx.GetLogger()
        logger.Info("collect workflows")
 
-       clauses := []dal.Clause{
-               dal.Select("id"),
-               dal.From(&models.CircleciPipeline{}),
-               dal.Where("_tool_circleci_pipelines.connection_id = ? and 
_tool_circleci_pipelines.project_slug = ? ", data.Options.ConnectionId, 
data.Options.ProjectSlug),
-       }
+       collector, err := 
api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
+               RawDataSubTaskArgs: *rawDataSubTaskArgs,
+               ApiClient:          data.ApiClient,
+               CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
+                       PageSize:              int(data.Options.PageSize),
+                       GetNextPageCustomData: ExtractNextPageToken,
+                       BuildInputIterator: func(isIncremental bool, 
createdAfter *time.Time) (api.Iterator, errors.Error) {
+                               clauses := []dal.Clause{
+                                       dal.Select("id"),
+                                       dal.From(&models.CircleciPipeline{}),
+                                       dal.Where("connection_id = ? AND 
project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug),
+                               }
 
-       db := taskCtx.GetDal()
-       cursor, err := db.Cursor(clauses...)
-       if err != nil {
-               return err
-       }
-       iterator, err := api.NewDalCursorIterator(db, cursor, 
reflect.TypeOf(models.CircleciPipeline{}))
-       if err != nil {
-               return err
-       }
+                               if isIncremental {
+                                       clauses = append(clauses, 
dal.Where("created_date > ?", createdAfter))
+                               }
+
+                               db := taskCtx.GetDal()
+                               cursor, err := db.Cursor(clauses...)
+                               if err != nil {
+                                       return nil, err
+                               }
+                               return api.NewDalCursorIterator(db, cursor, 
reflect.TypeOf(models.CircleciPipeline{}))
+                       },
+                       FinalizableApiCollectorCommonArgs: 
api.FinalizableApiCollectorCommonArgs{
+                               UrlTemplate:    "/v2/pipeline/{{ .Input.Id 
}}/workflow",
+                               Query:          BuildQueryParamsWithPageToken,
+                               ResponseParser: ParseCircleciPageTokenResp,
+                               AfterResponse:  ignoreDeletedBuilds, // Ignore 
the 404 response if a workflow has been deleted
+                       },
+                       GetCreated: extractCreatedAt,
+               },
+               CollectUnfinishedDetails: 
&api.FinalizableApiCollectorDetailArgs{
+                       FinalizableApiCollectorCommonArgs: 
api.FinalizableApiCollectorCommonArgs{
+                               UrlTemplate: "/v2/workflow/{{ .Input.Id }}",
+                               Query:       nil,
+                               ResponseParser: func(res *http.Response) 
([]json.RawMessage, errors.Error) {
+                                       var data json.RawMessage
+                                       err := api.UnmarshalResponse(res, &data)
+                                       return []json.RawMessage{data}, err
+                               },
+                               AfterResponse: ignoreDeletedBuilds,
+                       },
+                       BuildInputIterator: func() (api.Iterator, errors.Error) 
{
+                               clauses := []dal.Clause{
+                                       dal.Select("id"),
+                                       dal.From(&models.CircleciWorkflow{}),
+                                       dal.Where("connection_id = ? AND 
project_slug = ? AND status IN ('running', 'on_hold', 'failing')", 
data.Options.ConnectionId, data.Options.ProjectSlug),
+                               }
 
-       collector, err := api.NewApiCollector(api.ApiCollectorArgs{
-               RawDataSubTaskArgs:    *rawDataSubTaskArgs,
-               ApiClient:             data.ApiClient,
-               UrlTemplate:           "/v2/pipeline/{{ .Input.Id }}/workflow",
-               Input:                 iterator,
-               GetNextPageCustomData: ExtractNextPageToken,
-               Query:                 BuildQueryParamsWithPageToken,
-               ResponseParser:        ParseCircleciPageTokenResp,
+                               db := taskCtx.GetDal()
+                               cursor, err := db.Cursor(clauses...)
+                               if err != nil {
+                                       return nil, err
+                               }
+                               return api.NewDalCursorIterator(db, cursor, 
reflect.TypeOf(models.CircleciWorkflow{}))
+                       },
+               },
        })
        if err != nil {
                logger.Error(err, "collect workflows error")

Reply via email to