This is an automated email from the ASF dual-hosted git repository.

zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new f649d1de2 feat: enhance timeFilter/diffSync support for jenkins build 
(#4700)
f649d1de2 is described below

commit f649d1de211d6374d79c20933f37bba3f0693177
Author: abeizn <[email protected]>
AuthorDate: Fri Mar 17 20:51:28 2023 +0800

    feat: enhance timeFilter/diffSync support for jenkins build (#4700)
    
    * feat: enhance timeFilter/diffSync support for jenkins build
    
    * feat: linux stamp convertor
    
    * feat: linux stamp convertor
---
 backend/plugins/jenkins/tasks/build_collector.go | 103 +++++++++++++++++------
 1 file changed, 77 insertions(+), 26 deletions(-)

diff --git a/backend/plugins/jenkins/tasks/build_collector.go 
b/backend/plugins/jenkins/tasks/build_collector.go
index 3a64b0086..8a72787f4 100644
--- a/backend/plugins/jenkins/tasks/build_collector.go
+++ b/backend/plugins/jenkins/tasks/build_collector.go
@@ -20,12 +20,17 @@ package tasks
 import (
        "encoding/json"
        "fmt"
+       "io"
        "net/http"
        "net/url"
+       "reflect"
+       "time"
 
+       "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
-       "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+       helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+       "github.com/apache/incubator-devlake/plugins/jenkins/models"
 )
 
 const RAW_BUILD_TABLE = "jenkins_api_builds"
@@ -43,11 +48,16 @@ type SimpleJob struct {
        Path string
 }
 
+type SimpleJenkinsApiBuild struct {
+       Number    int64
+       Timestamp int64 `json:"timestamp"`
+}
+
 func CollectApiBuilds(taskCtx plugin.SubTaskContext) errors.Error {
        data := taskCtx.GetData().(*JenkinsTaskData)
-
-       collector, err := api.NewApiCollector(api.ApiCollectorArgs{
-               RawDataSubTaskArgs: api.RawDataSubTaskArgs{
+       db := taskCtx.GetDal()
+       collector, err := 
helper.NewStatefulApiCollectorForFinalizableEntity(helper.FinalizableApiCollectorArgs{
+               RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
                        Params: JenkinsApiParams{
                                ConnectionId: data.Options.ConnectionId,
                                FullName:     data.Options.JobFullName,
@@ -55,29 +65,70 @@ func CollectApiBuilds(taskCtx plugin.SubTaskContext) 
errors.Error {
                        Ctx:   taskCtx,
                        Table: RAW_BUILD_TABLE,
                },
-               ApiClient:   data.ApiClient,
-               PageSize:    100,
-               UrlTemplate: fmt.Sprintf("%sjob/%s/api/json", 
data.Options.JobPath, data.Options.JobName),
-               /*
-                       (Optional) Return query string for request, or you can 
plug them into UrlTemplate directly
-               */
-               Query: func(reqData *api.RequestData) (url.Values, 
errors.Error) {
-                       query := url.Values{}
-                       treeValue := fmt.Sprintf(
-                               
"allBuilds[number,timestamp,duration,building,estimatedDuration,fullDisplayName,result,actions[lastBuiltRevision[SHA1,branch[name]],remoteUrls,mercurialRevisionNumber,causes[*]],changeSet[kind,revisions[revision]]]{%d,%d}",
-                               reqData.Pager.Skip, 
reqData.Pager.Skip+reqData.Pager.Size)
-                       query.Set("tree", treeValue)
-                       return query, nil
+               ApiClient: data.ApiClient,
+               TimeAfter: data.TimeAfter,
+               CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{
+                       PageSize:    100,
+                       Concurrency: 10,
+                       FinalizableApiCollectorCommonArgs: 
helper.FinalizableApiCollectorCommonArgs{
+                               UrlTemplate: fmt.Sprintf("%sjob/%s/api/json", 
data.Options.JobPath, data.Options.JobName),
+                               Query: func(reqData *helper.RequestData, 
createdAfter *time.Time) (url.Values, errors.Error) {
+                                       query := url.Values{}
+                                       treeValue := fmt.Sprintf(
+                                               
"allBuilds[timestamp,number,duration,building,estimatedDuration,fullDisplayName,result,actions[lastBuiltRevision[SHA1,branch[name]],remoteUrls,mercurialRevisionNumber,causes[*]],changeSet[kind,revisions[revision]]]{%d,%d}",
+                                               reqData.Pager.Skip, 
reqData.Pager.Skip+reqData.Pager.Size)
+                                       query.Set("tree", treeValue)
+                                       return query, nil
+                               },
+                               ResponseParser: func(res *http.Response) 
([]json.RawMessage, errors.Error) {
+                                       var data struct {
+                                               Builds []json.RawMessage 
`json:"allBuilds"`
+                                       }
+                                       err := helper.UnmarshalResponse(res, 
&data)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       return data.Builds, nil
+                               },
+                       },
+                       GetCreated: func(item json.RawMessage) (time.Time, 
errors.Error) {
+                               b := &SimpleJenkinsApiBuild{}
+                               err := json.Unmarshal(item, b)
+                               if err != nil {
+                                       return time.Time{}, 
errors.BadInput.Wrap(err, "failed to unmarshal jenkins build")
+                               }
+                               seconds := b.Timestamp / 1000
+                               nanos := (b.Timestamp % 1000) * 1000000
+                               return time.Unix(seconds, nanos), nil
+                       },
                },
-               ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
-                       var data struct {
-                               Builds []json.RawMessage `json:"allBuilds"`
-                       }
-                       err := api.UnmarshalResponse(res, &data)
-                       if err != nil {
-                               return nil, err
-                       }
-                       return data.Builds, nil
+               CollectUnfinishedDetails: 
helper.FinalizableApiCollectorDetailArgs{
+                       BuildInputIterator: func() (helper.Iterator, 
errors.Error) {
+                               cursor, err := db.Cursor(
+                                       dal.Select("number"),
+                                       dal.From(&models.JenkinsBuild{}),
+                                       dal.Where(
+                                               "full_name = ? AND 
connection_id = ?",
+                                               data.Options.JobFullName, 
data.Options.ConnectionId,
+                                       ),
+                               )
+                               if err != nil {
+                                       return nil, err
+                               }
+                               return helper.NewDalCursorIterator(db, cursor, 
reflect.TypeOf(SimpleJenkinsApiBuild{}))
+                       },
+                       FinalizableApiCollectorCommonArgs: 
helper.FinalizableApiCollectorCommonArgs{
+                               UrlTemplate: fmt.Sprintf("%sjob/%s/{{ 
.Input.Number 
}}/api/json?tree=number,url,result,timestamp,id,duration,estimatedDuration,building",
+                                       data.Options.JobPath, 
data.Options.JobName),
+                               ResponseParser: func(res *http.Response) 
([]json.RawMessage, errors.Error) {
+                                       body, err := io.ReadAll(res.Body)
+                                       if err != nil {
+                                               return nil, errors.Convert(err)
+                                       }
+                                       res.Body.Close()
+                                       return []json.RawMessage{body}, nil
+                               },
+                       },
                },
        })
 

Reply via email to