This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new bc56e6c16 feat: add time filter for gitlab jobs (#4009)
bc56e6c16 is described below
commit bc56e6c1621715204393b8e277f355552656fb46
Author: Likyh <[email protected]>
AuthorDate: Thu Dec 22 11:44:16 2022 +0800
feat: add time filter for gitlab jobs (#4009)
* feat: add time filter for gitlab jobs
* fix: for review
* fix: for review
---
plugins/gitlab/tasks/job_collector.go | 2 +-
plugins/gitlab/tasks/shared.go | 33 +++++++++++++++++++++++++++++++++
plugins/helper/api_collector.go | 8 +++++++-
3 files changed, 41 insertions(+), 2 deletions(-)
diff --git a/plugins/gitlab/tasks/job_collector.go
b/plugins/gitlab/tasks/job_collector.go
index 14362e132..14656d061 100644
--- a/plugins/gitlab/tasks/job_collector.go
+++ b/plugins/gitlab/tasks/job_collector.go
@@ -43,7 +43,7 @@ func CollectApiJobs(taskCtx core.SubTaskContext) errors.Error
{
Incremental: false,
UrlTemplate: "projects/{{ .Params.ProjectId }}/jobs",
Query: GetQuery,
- ResponseParser: GetRawMessageFromResponse,
+ ResponseParser:
GetRawMessageCreatedAtAfter(data.CreatedDateAfter),
AfterResponse: ignoreHTTPStatus403, // ignore 403 for
CI/CD disable
})
diff --git a/plugins/gitlab/tasks/shared.go b/plugins/gitlab/tasks/shared.go
index f17a45eb0..e6926f088 100644
--- a/plugins/gitlab/tasks/shared.go
+++ b/plugins/gitlab/tasks/shared.go
@@ -27,6 +27,7 @@ import (
"net/url"
"reflect"
"strconv"
+ "time"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
@@ -74,6 +75,38 @@ func GetRawMessageFromResponse(res *http.Response)
([]json.RawMessage, errors.Er
return rawMessages, nil
}
+func GetRawMessageCreatedAtAfter(createDateAfter *time.Time) func(res
*http.Response) ([]json.RawMessage, errors.Error) {
+ type ApiModel struct {
+ CreatedAt *helper.Iso8601Time `json:"created_at"`
+ }
+
+ return func(res *http.Response) ([]json.RawMessage, errors.Error) {
+ rawMessages, err := GetRawMessageFromResponse(res)
+ if err != nil {
+ return nil, errors.Default.Wrap(err, fmt.Sprintf("error
reading response from %s", res.Request.URL.String()))
+ }
+ isFinish := true
+ filterRawMessages := []json.RawMessage{}
+ for _, rawMessage := range rawMessages {
+ apiModel := &ApiModel{}
+ err = errors.Convert(json.Unmarshal(rawMessage,
apiModel))
+ if err != nil {
+ return nil, err
+ }
+ if createDateAfter == nil ||
createDateAfter.Before(apiModel.CreatedAt.ToTime()) {
+ // only finish when all items are created
before `createDateAfter`
+ // because gitlab's order may not strict enough
+ isFinish = false
+ filterRawMessages = append(filterRawMessages,
rawMessage)
+ }
+ }
+ if isFinish {
+ return filterRawMessages, helper.ErrFinishCollect
+ }
+ return filterRawMessages, nil
+ }
+}
+
func GetQuery(reqData *helper.RequestData) (url.Values, errors.Error) {
query := url.Values{}
query.Set("with_stats", "true")
diff --git a/plugins/helper/api_collector.go b/plugins/helper/api_collector.go
index 950ceaa68..50494bd4a 100644
--- a/plugins/helper/api_collector.go
+++ b/plugins/helper/api_collector.go
@@ -367,7 +367,12 @@ func (collector *ApiCollector) fetchAsync(reqData
*RequestData, handler func(int
// convert body to array of RawJSON
items, err := collector.args.ResponseParser(res)
if err != nil {
- return errors.Default.Wrap(err, fmt.Sprintf("error
parsing response from %s", apiUrl))
+ if errors.Is(err, ErrFinishCollect) {
+ logger.Info("a fetch stop by parser, reqInput:
#%d", reqData.Params)
+ handler = nil
+ } else {
+ return errors.Default.Wrap(err,
fmt.Sprintf("error parsing response from %s", apiUrl))
+ }
}
// save to db
count := len(items)
@@ -394,6 +399,7 @@ func (collector *ApiCollector) fetchAsync(reqData
*RequestData, handler func(int
// increase progress only when it was not nested
collector.args.Ctx.IncProgress(1)
if handler != nil {
+ // trigger next fetch, but return if ErrFinishCollect
got from ResponseParser
res.Body = io.NopCloser(bytes.NewBuffer(body))
return handler(count, body, res)
}