This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 80ec49b55 feat(circleci): Cherry Pick #7820 #7986 (#8341)
80ec49b55 is described below
commit 80ec49b55820fc6ac7f7a9cd29828c9d68810295
Author: John Ibsen <[email protected]>
AuthorDate: Tue Mar 18 21:36:00 2025 -0400
feat(circleci): Cherry Pick #7820 #7986 (#8341)
* feat(circleci-plugin): incremental data collection (#7986)
* feat(api_collector_stateful): handle case were response has records from
both before & after createdAfter of last collection
* feat(circleci-plugin): incremental collection for pipelines
* feat(api_collector_stateful): expose Input collector arg for
StatefulFinalizableEntity to collect data based on previous collection
* feat(circleci-plugin): incremental data collection for workflows
* feat(circleci-plugin): incremental data collection for jobs
* refactor(circleci-plugin): use common query param function
* refactor(circleci-plugin): use BuildQueryParamsWithPageToken func when
collecting unfinished job details
* fix(circleci-plugin): pipeline collector time range (#7820)
* fix(circleci-plugin): only collect pipelines from after data time range
* fix(circleci-plugin): ignore 404 not found when collecting jobs or
workflows
* Cleanup from bad merge
---------
Co-authored-by: Nick Williams <[email protected]>
Co-authored-by: John Ibsen <[email protected]>
---
.../pluginhelper/api/api_collector_stateful.go | 38 +++++++++-
backend/plugins/circleci/tasks/job_collector.go | 87 ++++++++++++++++------
.../plugins/circleci/tasks/pipeline_collector.go | 45 +++++++++--
backend/plugins/circleci/tasks/shared.go | 27 ++++++-
.../plugins/circleci/tasks/workflow_collector.go | 84 +++++++++++++++------
5 files changed, 220 insertions(+), 61 deletions(-)
diff --git a/backend/helpers/pluginhelper/api/api_collector_stateful.go
b/backend/helpers/pluginhelper/api/api_collector_stateful.go
index f4e2c67fe..985392f7b 100644
--- a/backend/helpers/pluginhelper/api/api_collector_stateful.go
+++ b/backend/helpers/pluginhelper/api/api_collector_stateful.go
@@ -139,10 +139,19 @@ func NewStatefulApiCollectorForFinalizableEntity(args
FinalizableApiCollectorArg
createdAfter := manager.CollectorStateManager.GetSince()
isIncremental := manager.CollectorStateManager.IsIncremental()
+ var inputIterator Iterator
+ if args.CollectNewRecordsByList.BuildInputIterator != nil {
+ inputIterator, err =
args.CollectNewRecordsByList.BuildInputIterator(isIncremental, createdAfter)
+ if err != nil {
+ return nil, err
+ }
+ }
+
// step 1: create a collector to collect newly added records
err = manager.InitCollector(ApiCollectorArgs{
ApiClient: args.ApiClient,
// common
+ Input: inputIterator,
Incremental: isIncremental,
UrlTemplate: args.CollectNewRecordsByList.UrlTemplate,
Query: func(reqData *RequestData) (url.Values, errors.Error) {
@@ -169,21 +178,41 @@ func NewStatefulApiCollectorForFinalizableEntity(args
FinalizableApiCollectorArg
// time filter or diff sync
if createdAfter != nil &&
args.CollectNewRecordsByList.GetCreated != nil {
- // if the first record of the page was created
before createdAfter, return emtpy set and stop
+ // if the first record of the page was created
before createdAfter and not a zero value, return empty set and stop
firstCreated, err :=
args.CollectNewRecordsByList.GetCreated(items[0])
if err != nil {
return nil, err
}
- if firstCreated.Before(*createdAfter) {
+ if firstCreated.Before(*createdAfter) &&
!firstCreated.IsZero() {
return nil, ErrFinishCollect
}
- // if the last record was created before
createdAfter, return records and stop
+
+ // If last record was created before
CreatedAfter, including a zero value, check each record individually
lastCreated, err :=
args.CollectNewRecordsByList.GetCreated(items[len(items)-1])
if err != nil {
return nil, err
}
if lastCreated.Before(*createdAfter) {
- return items, ErrFinishCollect
+ var validItems []json.RawMessage
+ // Only collect items that were created
after the last successful collection to prevent duplicates
+ for _, item := range items {
+ itemCreatedAt, err :=
args.CollectNewRecordsByList.GetCreated(item)
+ if err != nil {
+ return nil, err
+ }
+
+ if itemCreatedAt.IsZero() {
+ // If zero then
timestamp is null on the response - accept as valid for downstream processing
+ validItems =
append(validItems, item)
+ continue
+ }
+
+ if
itemCreatedAt.Before(*createdAfter) {
+ // Once we reach an
item that was created before the last successful collection, stop & return
+ return validItems,
ErrFinishCollect
+ }
+ validItems = append(validItems,
item)
+ }
}
}
return items, err
@@ -267,6 +296,7 @@ type FinalizableApiCollectorListArgs struct {
Concurrency int
// required for Undetermined
Strategy, number of concurrent requests
GetNextPageCustomData func(prevReqData *RequestData, prevPageResponse
*http.Response) (interface{}, errors.Error) // required for Sequential
Strategy, to extract the next page cursor from the given response
GetTotalPages func(res *http.Response, args *ApiCollectorArgs)
(int, errors.Error) // required for Determined Strategy,
to extract the total number of pages from the given response
+ BuildInputIterator func(isIncremental bool, createdAfter *time.Time)
(Iterator, errors.Error)
}
// FinalizableApiCollectorDetailArgs is the arguments for the detail collector
diff --git a/backend/plugins/circleci/tasks/job_collector.go
b/backend/plugins/circleci/tasks/job_collector.go
index 1f2f858f5..fd1d78286 100644
--- a/backend/plugins/circleci/tasks/job_collector.go
+++ b/backend/plugins/circleci/tasks/job_collector.go
@@ -18,12 +18,15 @@ limitations under the License.
package tasks
import (
+ "encoding/json"
+ "reflect"
+ "time"
+
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/circleci/models"
- "reflect"
)
const RAW_JOB_TABLE = "circleci_api_jobs"
@@ -43,30 +46,68 @@ func CollectJobs(taskCtx plugin.SubTaskContext)
errors.Error {
logger := taskCtx.GetLogger()
logger.Info("collect jobs")
- clauses := []dal.Clause{
- dal.Select("id, pipeline_id"),
- dal.From(&models.CircleciWorkflow{}),
- dal.Where("_tool_circleci_workflows.connection_id = ? and
_tool_circleci_workflows.project_slug = ? ", data.Options.ConnectionId,
data.Options.ProjectSlug),
- }
+ collector, err :=
api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
+ RawDataSubTaskArgs: *rawDataSubTaskArgs,
+ ApiClient: data.ApiClient,
+ CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
+ PageSize: int(data.Options.PageSize),
+ GetNextPageCustomData: ExtractNextPageToken,
+ BuildInputIterator: func(isIncremental bool,
createdAfter *time.Time) (api.Iterator, errors.Error) {
+ clauses := []dal.Clause{
+ dal.Select("id, pipeline_id"), //
pipeline_id not on individual job response but required for result
+ dal.From(&models.CircleciWorkflow{}),
+ dal.Where("connection_id = ? and
project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug),
+ }
- db := taskCtx.GetDal()
- cursor, err := db.Cursor(clauses...)
- if err != nil {
- return err
- }
- iterator, err := api.NewDalCursorIterator(db, cursor,
reflect.TypeOf(models.CircleciWorkflow{}))
- if err != nil {
- return err
- }
+ if isIncremental {
+ clauses = append(clauses,
dal.Where("created_date > ?", createdAfter))
+ }
+
+ db := taskCtx.GetDal()
+ cursor, err := db.Cursor(clauses...)
+ if err != nil {
+ return nil, err
+ }
+ return api.NewDalCursorIterator(db, cursor,
reflect.TypeOf(models.CircleciWorkflow{}))
+ },
+ FinalizableApiCollectorCommonArgs:
api.FinalizableApiCollectorCommonArgs{
+ UrlTemplate: "/v2/workflow/{{ .Input.Id
}}/job",
+ Query: BuildQueryParamsWithPageToken,
+ ResponseParser: ParseCircleciPageTokenResp,
+ AfterResponse: ignoreDeletedBuilds, // Ignore
the 404 response if a workflow has been deleted
+ },
+ GetCreated: func(item json.RawMessage) (time.Time,
errors.Error) {
+ var job struct { // Individual job response
lacks created_at field, so have to use started_at
+ CreatedAt time.Time `json:"started_at"`
// This will be null in some cases (e.g. queued, not_running, blocked)
+ }
+ if err := json.Unmarshal(item, &job); err !=
nil {
+ return time.Time{},
errors.Default.Wrap(err, "failed to unmarshal job")
+ }
+ return job.CreatedAt, nil
+ },
+ },
+ CollectUnfinishedDetails:
&api.FinalizableApiCollectorDetailArgs{
+ FinalizableApiCollectorCommonArgs:
api.FinalizableApiCollectorCommonArgs{
+ UrlTemplate: "/v2/workflow/{{ .Input.Id
}}/job", // The individual job endpoint has different fields so need to
recollect all jobs for a workflow
+ Query: BuildQueryParamsWithPageToken,
+ ResponseParser: ParseCircleciPageTokenResp,
+ AfterResponse: ignoreDeletedBuilds,
+ },
+ BuildInputIterator: func() (api.Iterator, errors.Error)
{
+ clauses := []dal.Clause{
+ dal.Select("DISTINCT workflow_id"), //
Only need to recollect jobs for a workflow once
+ dal.From(&models.CircleciJob{}),
+ dal.Where("connection_id = ? AND
project_slug = ? AND status IN ('running', 'not_running', 'queued',
'on_hold')", data.Options.ConnectionId, data.Options.ProjectSlug),
+ }
- collector, err := api.NewApiCollector(api.ApiCollectorArgs{
- RawDataSubTaskArgs: *rawDataSubTaskArgs,
- ApiClient: data.ApiClient,
- UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job",
- Input: iterator,
- GetNextPageCustomData: ExtractNextPageToken,
- Query: BuildQueryParamsWithPageToken,
- ResponseParser: ParseCircleciPageTokenResp,
+ db := taskCtx.GetDal()
+ cursor, err := db.Cursor(clauses...)
+ if err != nil {
+ return nil, err
+ }
+ return api.NewDalCursorIterator(db, cursor,
reflect.TypeOf(models.CircleciJob{}))
+ },
+ },
})
if err != nil {
logger.Error(err, "collect jobs error")
diff --git a/backend/plugins/circleci/tasks/pipeline_collector.go
b/backend/plugins/circleci/tasks/pipeline_collector.go
index b7940e6c8..2dc8da4f3 100644
--- a/backend/plugins/circleci/tasks/pipeline_collector.go
+++ b/backend/plugins/circleci/tasks/pipeline_collector.go
@@ -18,6 +18,9 @@ limitations under the License.
package tasks
import (
+ "encoding/json"
+ "net/http"
+
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
@@ -38,15 +41,41 @@ var CollectPipelinesMeta = plugin.SubTaskMeta{
func CollectPipelines(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_PIPELINE_TABLE)
logger := taskCtx.GetLogger()
+ timeAfter := rawDataSubTaskArgs.Ctx.TaskContext().SyncPolicy().TimeAfter
logger.Info("collect pipelines")
- collector, err := api.NewApiCollector(api.ApiCollectorArgs{
- RawDataSubTaskArgs: *rawDataSubTaskArgs,
- ApiClient: data.ApiClient,
- UrlTemplate: "/v2/project/{{ .Params.ProjectSlug
}}/pipeline",
- PageSize: int(data.Options.PageSize),
- GetNextPageCustomData: ExtractNextPageToken,
- Query: BuildQueryParamsWithPageToken,
- ResponseParser: ParseCircleciPageTokenResp,
+ collector, err :=
api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
+ RawDataSubTaskArgs: *rawDataSubTaskArgs,
+ ApiClient: data.ApiClient,
+ CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
+ PageSize: int(data.Options.PageSize),
+ GetNextPageCustomData: ExtractNextPageToken,
+ FinalizableApiCollectorCommonArgs:
api.FinalizableApiCollectorCommonArgs{
+ UrlTemplate: "/v2/project/{{
.Params.ProjectSlug }}/pipeline",
+ Query: BuildQueryParamsWithPageToken,
+ ResponseParser: func(res *http.Response)
([]json.RawMessage, errors.Error) {
+ data :=
CircleciPageTokenResp[[]json.RawMessage]{}
+ err := api.UnmarshalResponse(res, &data)
+
+ if err != nil {
+ return nil, err
+ }
+ filteredItems := []json.RawMessage{}
+ for _, item := range data.Items {
+ pipelineCreatedAt, err :=
extractCreatedAt(item)
+
+ if err != nil {
+ return nil, err
+ }
+ if
pipelineCreatedAt.Before(*timeAfter) {
+ return filteredItems,
api.ErrFinishCollect
+ }
+ filteredItems =
append(filteredItems, item)
+ }
+ return filteredItems, nil
+ },
+ },
+ GetCreated: extractCreatedAt,
+ },
})
if err != nil {
logger.Error(err, "collect pipelines error")
diff --git a/backend/plugins/circleci/tasks/shared.go
b/backend/plugins/circleci/tasks/shared.go
index dd2c4944a..6e235ecd6 100644
--- a/backend/plugins/circleci/tasks/shared.go
+++ b/backend/plugins/circleci/tasks/shared.go
@@ -19,14 +19,16 @@ package tasks
import (
"encoding/json"
+ "net/http"
+ "net/url"
+ "time"
+
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models/domainlayer/didgen"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/circleci/models"
- "net/http"
- "net/url"
)
var accountIdGen *didgen.DomainIdGenerator
@@ -107,7 +109,7 @@ func ExtractNextPageToken(prevReqData *api.RequestData,
prevPageResponse *http.R
return res.NextPageToken, nil
}
-func BuildQueryParamsWithPageToken(reqData *api.RequestData) (url.Values,
errors.Error) {
+func BuildQueryParamsWithPageToken(reqData *api.RequestData, _ *time.Time)
(url.Values, errors.Error) {
query := url.Values{}
if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" {
query.Set("page-token", pageToken)
@@ -120,3 +122,22 @@ func ParseCircleciPageTokenResp(res *http.Response)
([]json.RawMessage, errors.E
err := api.UnmarshalResponse(res, &data)
return data.Items, err
}
+
+func ignoreDeletedBuilds(res *http.Response) errors.Error {
+ // CircleCI API will return a 404 response for a workflow/job that has
been deleted
+ // due to their data retention policy. We should ignore these errors.
+ if res.StatusCode == http.StatusNotFound {
+ return api.ErrIgnoreAndContinue
+ }
+ return nil
+}
+
+func extractCreatedAt(item json.RawMessage) (time.Time, errors.Error) {
+ var entity struct {
+ CreatedAt time.Time `json:"created_at"`
+ }
+ if err := json.Unmarshal(item, &entity); err != nil {
+ return time.Time{}, errors.Default.Wrap(err, "failed to
unmarshal item")
+ }
+ return entity.CreatedAt, nil
+}
diff --git a/backend/plugins/circleci/tasks/workflow_collector.go
b/backend/plugins/circleci/tasks/workflow_collector.go
index 8234eceac..f0f3aebe5 100644
--- a/backend/plugins/circleci/tasks/workflow_collector.go
+++ b/backend/plugins/circleci/tasks/workflow_collector.go
@@ -18,12 +18,16 @@ limitations under the License.
package tasks
import (
+ "encoding/json"
+ "net/http"
+ "reflect"
+ "time"
+
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/plugins/circleci/models"
- "reflect"
)
const RAW_WORKFLOW_TABLE = "circleci_api_workflows"
@@ -43,30 +47,64 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext)
errors.Error {
logger := taskCtx.GetLogger()
logger.Info("collect workflows")
- clauses := []dal.Clause{
- dal.Select("id"),
- dal.From(&models.CircleciPipeline{}),
- dal.Where("_tool_circleci_pipelines.connection_id = ? and
_tool_circleci_pipelines.project_slug = ? ", data.Options.ConnectionId,
data.Options.ProjectSlug),
- }
+ collector, err :=
api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
+ RawDataSubTaskArgs: *rawDataSubTaskArgs,
+ ApiClient: data.ApiClient,
+ CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
+ PageSize: int(data.Options.PageSize),
+ GetNextPageCustomData: ExtractNextPageToken,
+ BuildInputIterator: func(isIncremental bool,
createdAfter *time.Time) (api.Iterator, errors.Error) {
+ clauses := []dal.Clause{
+ dal.Select("id"),
+ dal.From(&models.CircleciPipeline{}),
+ dal.Where("connection_id = ? AND
project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug),
+ }
- db := taskCtx.GetDal()
- cursor, err := db.Cursor(clauses...)
- if err != nil {
- return err
- }
- iterator, err := api.NewDalCursorIterator(db, cursor,
reflect.TypeOf(models.CircleciPipeline{}))
- if err != nil {
- return err
- }
+ if isIncremental {
+ clauses = append(clauses,
dal.Where("created_date > ?", createdAfter))
+ }
+
+ db := taskCtx.GetDal()
+ cursor, err := db.Cursor(clauses...)
+ if err != nil {
+ return nil, err
+ }
+ return api.NewDalCursorIterator(db, cursor,
reflect.TypeOf(models.CircleciPipeline{}))
+ },
+ FinalizableApiCollectorCommonArgs:
api.FinalizableApiCollectorCommonArgs{
+ UrlTemplate: "/v2/pipeline/{{ .Input.Id
}}/workflow",
+ Query: BuildQueryParamsWithPageToken,
+ ResponseParser: ParseCircleciPageTokenResp,
+ AfterResponse: ignoreDeletedBuilds, // Ignore
the 404 response if a workflow has been deleted
+ },
+ GetCreated: extractCreatedAt,
+ },
+ CollectUnfinishedDetails:
&api.FinalizableApiCollectorDetailArgs{
+ FinalizableApiCollectorCommonArgs:
api.FinalizableApiCollectorCommonArgs{
+ UrlTemplate: "/v2/workflow/{{ .Input.Id }}",
+ Query: nil,
+ ResponseParser: func(res *http.Response)
([]json.RawMessage, errors.Error) {
+ var data json.RawMessage
+ err := api.UnmarshalResponse(res, &data)
+ return []json.RawMessage{data}, err
+ },
+ AfterResponse: ignoreDeletedBuilds,
+ },
+ BuildInputIterator: func() (api.Iterator, errors.Error)
{
+ clauses := []dal.Clause{
+ dal.Select("id"),
+ dal.From(&models.CircleciWorkflow{}),
+ dal.Where("connection_id = ? AND
project_slug = ? AND status IN ('running', 'on_hold', 'failing')",
data.Options.ConnectionId, data.Options.ProjectSlug),
+ }
- collector, err := api.NewApiCollector(api.ApiCollectorArgs{
- RawDataSubTaskArgs: *rawDataSubTaskArgs,
- ApiClient: data.ApiClient,
- UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow",
- Input: iterator,
- GetNextPageCustomData: ExtractNextPageToken,
- Query: BuildQueryParamsWithPageToken,
- ResponseParser: ParseCircleciPageTokenResp,
+ db := taskCtx.GetDal()
+ cursor, err := db.Cursor(clauses...)
+ if err != nil {
+ return nil, err
+ }
+ return api.NewDalCursorIterator(db, cursor,
reflect.TypeOf(models.CircleciWorkflow{}))
+ },
+ },
})
if err != nil {
logger.Error(err, "collect workflows error")