This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new edc2412f9 fix(circleci-plugin): pipeline collector time range (#7820)
edc2412f9 is described below
commit edc2412f9e3e29de2d38a806f4c94e712e68f05a
Author: Nick Williams <[email protected]>
AuthorDate: Thu Aug 15 07:40:04 2024 +0100
fix(circleci-plugin): pipeline collector time range (#7820)
* fix(circleci-plugin): only collect pipelines from after data time range
* fix(circleci-plugin): ignore 404 not found when collecting jobs or
workflows
---
backend/plugins/circleci/tasks/job_collector.go | 1 +
.../plugins/circleci/tasks/pipeline_collector.go | 29 +++++++++++++++++++++-
backend/plugins/circleci/tasks/shared.go | 9 +++++++
.../plugins/circleci/tasks/workflow_collector.go | 1 +
4 files changed, 39 insertions(+), 1 deletion(-)
diff --git a/backend/plugins/circleci/tasks/job_collector.go
b/backend/plugins/circleci/tasks/job_collector.go
index 06241a547..60fae1b9b 100644
--- a/backend/plugins/circleci/tasks/job_collector.go
+++ b/backend/plugins/circleci/tasks/job_collector.go
@@ -68,6 +68,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
GetNextPageCustomData: ExtractNextPageToken,
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
+ AfterResponse: ignoreDeletedBuilds, // Ignore the 404
response if a job has been deleted
})
if err != nil {
logger.Error(err, "collect jobs error")
diff --git a/backend/plugins/circleci/tasks/pipeline_collector.go
b/backend/plugins/circleci/tasks/pipeline_collector.go
index b7940e6c8..20055f894 100644
--- a/backend/plugins/circleci/tasks/pipeline_collector.go
+++ b/backend/plugins/circleci/tasks/pipeline_collector.go
@@ -18,6 +18,10 @@ limitations under the License.
package tasks
import (
+ "encoding/json"
+ "net/http"
+ "time"
+
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
@@ -38,6 +42,7 @@ var CollectPipelinesMeta = plugin.SubTaskMeta{
func CollectPipelines(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_PIPELINE_TABLE)
logger := taskCtx.GetLogger()
+ timeAfter := rawDataSubTaskArgs.Ctx.TaskContext().SyncPolicy().TimeAfter
logger.Info("collect pipelines")
collector, err := api.NewApiCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
@@ -46,7 +51,29 @@ func CollectPipelines(taskCtx plugin.SubTaskContext)
errors.Error {
PageSize: int(data.Options.PageSize),
GetNextPageCustomData: ExtractNextPageToken,
Query: BuildQueryParamsWithPageToken,
- ResponseParser: ParseCircleciPageTokenResp,
+ ResponseParser: func(res *http.Response) ([]json.RawMessage,
errors.Error) {
+ data := CircleciPageTokenResp[[]json.RawMessage]{}
+ err := api.UnmarshalResponse(res, &data)
+
+ if err != nil {
+ return nil, err
+ }
+ filteredItems := []json.RawMessage{}
+ for _, item := range data.Items {
+ var pipeline struct {
+ CreatedAt time.Time `json:"created_at"`
+ }
+ if err := json.Unmarshal(item, &pipeline); err
!= nil {
+ return nil, errors.Default.Wrap(err,
"failed to unmarshal pipeline item")
+ }
+ if pipeline.CreatedAt.Before(*timeAfter) {
+ return filteredItems,
api.ErrFinishCollect
+ }
+ filteredItems = append(filteredItems, item)
+
+ }
+ return filteredItems, nil
+ },
})
if err != nil {
logger.Error(err, "collect pipelines error")
diff --git a/backend/plugins/circleci/tasks/shared.go
b/backend/plugins/circleci/tasks/shared.go
index 998b1419c..89b9321a7 100644
--- a/backend/plugins/circleci/tasks/shared.go
+++ b/backend/plugins/circleci/tasks/shared.go
@@ -121,3 +121,12 @@ func ParseCircleciPageTokenResp(res *http.Response)
([]json.RawMessage, errors.E
err := api.UnmarshalResponse(res, &data)
return data.Items, err
}
+
+func ignoreDeletedBuilds(res *http.Response) errors.Error {
+ // CircleCI API will return a 404 response for a workflow/job that has
been deleted
+ // due to their data retention policy. We should ignore these errors.
+ if res.StatusCode == http.StatusNotFound {
+ return api.ErrIgnoreAndContinue
+ }
+ return nil
+}
diff --git a/backend/plugins/circleci/tasks/workflow_collector.go
b/backend/plugins/circleci/tasks/workflow_collector.go
index cdb4e1c36..342efc823 100644
--- a/backend/plugins/circleci/tasks/workflow_collector.go
+++ b/backend/plugins/circleci/tasks/workflow_collector.go
@@ -68,6 +68,7 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext)
errors.Error {
GetNextPageCustomData: ExtractNextPageToken,
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
+ AfterResponse: ignoreDeletedBuilds, // Ignore the 404
response if a workflow has been deleted
})
if err != nil {
logger.Error(err, "collect workflows error")