This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new bc56e6c16 feat: add time filter for gitlab jobs (#4009)
bc56e6c16 is described below

commit bc56e6c1621715204393b8e277f355552656fb46
Author: Likyh <[email protected]>
AuthorDate: Thu Dec 22 11:44:16 2022 +0800

    feat: add time filter for gitlab jobs (#4009)
    
    * feat: add time filter for gitlab jobs
    
    * fix: for review
    
    * fix: for review
---
 plugins/gitlab/tasks/job_collector.go |  2 +-
 plugins/gitlab/tasks/shared.go        | 33 +++++++++++++++++++++++++++++++++
 plugins/helper/api_collector.go       |  8 +++++++-
 3 files changed, 41 insertions(+), 2 deletions(-)

diff --git a/plugins/gitlab/tasks/job_collector.go 
b/plugins/gitlab/tasks/job_collector.go
index 14362e132..14656d061 100644
--- a/plugins/gitlab/tasks/job_collector.go
+++ b/plugins/gitlab/tasks/job_collector.go
@@ -43,7 +43,7 @@ func CollectApiJobs(taskCtx core.SubTaskContext) errors.Error 
{
                Incremental:        false,
                UrlTemplate:        "projects/{{ .Params.ProjectId }}/jobs",
                Query:              GetQuery,
-               ResponseParser:     GetRawMessageFromResponse,
+               ResponseParser:     
GetRawMessageCreatedAtAfter(data.CreatedDateAfter),
                AfterResponse:      ignoreHTTPStatus403, // ignore 403 for 
CI/CD disable
        })
 
diff --git a/plugins/gitlab/tasks/shared.go b/plugins/gitlab/tasks/shared.go
index f17a45eb0..e6926f088 100644
--- a/plugins/gitlab/tasks/shared.go
+++ b/plugins/gitlab/tasks/shared.go
@@ -27,6 +27,7 @@ import (
        "net/url"
        "reflect"
        "strconv"
+       "time"
 
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/helper"
@@ -74,6 +75,38 @@ func GetRawMessageFromResponse(res *http.Response) 
([]json.RawMessage, errors.Er
        return rawMessages, nil
 }
 
+func GetRawMessageCreatedAtAfter(createDateAfter *time.Time) func(res 
*http.Response) ([]json.RawMessage, errors.Error) {
+       type ApiModel struct {
+               CreatedAt *helper.Iso8601Time `json:"created_at"`
+       }
+
+       return func(res *http.Response) ([]json.RawMessage, errors.Error) {
+               rawMessages, err := GetRawMessageFromResponse(res)
+               if err != nil {
+                       return nil, errors.Default.Wrap(err, fmt.Sprintf("error 
reading response from %s", res.Request.URL.String()))
+               }
+               isFinish := true
+               filterRawMessages := []json.RawMessage{}
+               for _, rawMessage := range rawMessages {
+                       apiModel := &ApiModel{}
+                       err = errors.Convert(json.Unmarshal(rawMessage, 
apiModel))
+                       if err != nil {
+                               return nil, err
+                       }
+                       if createDateAfter == nil || 
createDateAfter.Before(apiModel.CreatedAt.ToTime()) {
+                               // only finish when all items are created 
before `createDateAfter`
+                               // because gitlab's order may not strict enough
+                               isFinish = false
+                               filterRawMessages = append(filterRawMessages, 
rawMessage)
+                       }
+               }
+               if isFinish {
+                       return filterRawMessages, helper.ErrFinishCollect
+               }
+               return filterRawMessages, nil
+       }
+}
+
 func GetQuery(reqData *helper.RequestData) (url.Values, errors.Error) {
        query := url.Values{}
        query.Set("with_stats", "true")
diff --git a/plugins/helper/api_collector.go b/plugins/helper/api_collector.go
index 950ceaa68..50494bd4a 100644
--- a/plugins/helper/api_collector.go
+++ b/plugins/helper/api_collector.go
@@ -367,7 +367,12 @@ func (collector *ApiCollector) fetchAsync(reqData 
*RequestData, handler func(int
                // convert body to array of RawJSON
                items, err := collector.args.ResponseParser(res)
                if err != nil {
-                       return errors.Default.Wrap(err, fmt.Sprintf("error 
parsing response from %s", apiUrl))
+                       if errors.Is(err, ErrFinishCollect) {
+                               logger.Info("a fetch stop by parser, reqInput: 
#%d", reqData.Params)
+                               handler = nil
+                       } else {
+                               return errors.Default.Wrap(err, 
fmt.Sprintf("error parsing response from %s", apiUrl))
+                       }
                }
                // save to db
                count := len(items)
@@ -394,6 +399,7 @@ func (collector *ApiCollector) fetchAsync(reqData 
*RequestData, handler func(int
                // increase progress only when it was not nested
                collector.args.Ctx.IncProgress(1)
                if handler != nil {
+                       // trigger next fetch, but return if ErrFinishCollect 
got from ResponseParser
                        res.Body = io.NopCloser(bytes.NewBuffer(body))
                        return handler(count, body, res)
                }

Reply via email to