This is an automated email from the ASF dual-hosted git repository.

zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 63d0062ad feat: finalizable collector helper / github pr support 
timeFilter/dif… (#4478)
63d0062ad is described below

commit 63d0062ad60c1960fdfb333fa970664772fde2af
Author: Klesh Wong <[email protected]>
AuthorDate: Wed Feb 22 15:59:05 2023 +0800

    feat: finalizable collector helper / github pr support timeFilter/dif… 
(#4478)
    
    * feat: finalizable collector helper / github pr support timeFilter/diffSync
    
    * fix: GetCreated should be optional for APIs that support filtering by 
createdAt
    
    * fix: typo and alias
---
 .../pluginhelper/api/api_collector_with_state.go   | 212 ++++++++++++++++++---
 backend/plugins/github/tasks/pr_collector.go       | 121 ++++++++----
 .../github_graphql/tasks/check_run_collector.go    |   2 +-
 3 files changed, 274 insertions(+), 61 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go 
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index 3d8b11eda..c506468cb 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -18,18 +18,24 @@ limitations under the License.
 package api
 
 import (
+       "encoding/json"
+       "net/http"
+       "net/url"
        "time"
 
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/models"
+       "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/common"
 )
 
 // ApiCollectorStateManager save collector state in framework table
 type ApiCollectorStateManager struct {
        RawDataSubTaskArgs
-       *ApiCollector
-       *GraphqlCollector
+       // *ApiCollector
+       // *GraphqlCollector
+       subtasks    []plugin.SubTask
        LatestState models.CollectorLatestState
        // Deprecating(timeAfter): to be deleted
        CreatedDateAfter *time.Time
@@ -94,40 +100,36 @@ func (m *ApiCollectorStateManager) IsIncremental() bool {
 }
 
 // InitCollector init the embedded collector
-func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs) (err 
errors.Error) {
+func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs) 
errors.Error {
        args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
-       m.ApiCollector, err = NewApiCollector(args)
-       return err
+       apiCollector, err := NewApiCollector(args)
+       if err != nil {
+               return err
+       }
+       m.subtasks = append(m.subtasks, apiCollector)
+       return nil
 }
 
 // InitGraphQLCollector init the embedded collector
-func (m *ApiCollectorStateManager) InitGraphQLCollector(args 
GraphqlCollectorArgs) (err errors.Error) {
+func (m *ApiCollectorStateManager) InitGraphQLCollector(args 
GraphqlCollectorArgs) errors.Error {
        args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
-       m.GraphqlCollector, err = NewGraphqlCollector(args)
-       return err
-}
-
-// Execute the embedded collector and record execute state
-func (m ApiCollectorStateManager) Execute() errors.Error {
-       err := m.ApiCollector.Execute()
+       graphqlCollector, err := NewGraphqlCollector(args)
        if err != nil {
                return err
        }
-
-       return m.updateState()
+       m.subtasks = append(m.subtasks, graphqlCollector)
+       return nil
 }
 
-// ExecuteGraphQL the embedded collector and record execute state
-func (m ApiCollectorStateManager) ExecuteGraphQL() errors.Error {
-       err := m.GraphqlCollector.Execute()
-       if err != nil {
-               return err
+// Execute the embedded collector and record execute state
+func (m *ApiCollectorStateManager) Execute() errors.Error {
+       for _, subtask := range m.subtasks {
+               err := subtask.Execute()
+               if err != nil {
+                       return err
+               }
        }
 
-       return m.updateState()
-}
-
-func (m ApiCollectorStateManager) updateState() errors.Error {
        db := m.Ctx.GetDal()
        m.LatestState.LatestSuccessStart = &m.ExecuteStart
        // Deprecating(timeAfter): to be deleted
@@ -135,3 +137,165 @@ func (m ApiCollectorStateManager) updateState() 
errors.Error {
        m.LatestState.TimeAfter = m.TimeAfter
        return db.CreateOrUpdate(&m.LatestState)
 }
+
+// NewStatefulApiCollectorForFinalizableEntity aims to add timeFilter/diffSync 
support for
+// APIs that do NOT support filtering data by updated date. However, it comes 
with the
+// following constraints:
+//  1. The entity is a short-lived object or it is likely to be irrelevant
+//     a. ci/id pipelines are short-lived objects
+//     b. pull request might took a year to be closed or never, but it is 
likely irrelevant
+//  2. The entity must be Finalizable: when it is finalized, no modification 
forever
+//  3. The API must fit one of the following traits:
+//     a. it supports filtering by Created Date, in this case, you may specify 
the `GetTotalPages`
+//     option to fetch data with Determined Strategy if possible.
+//     b. or sorting by Created Date in Descending order, in this case, you 
must use `Concurrency`
+//     or `GetNextPageCustomData` instead of `GetTotalPages` for Undetermined 
Strategy since we have
+//     to stop the process in the middle.
+func NewStatefulApiCollectorForFinalizableEntity(args 
FinalizableApiCollectorArgs) (plugin.SubTask, errors.Error) {
+       // create a manager which could execute multiple collector but acts as 
a single subtask to callers
+       manager, err := NewStatefulApiCollector(RawDataSubTaskArgs{
+               Ctx:    args.Ctx,
+               Params: args.Params,
+               Table:  args.Table,
+       }, args.TimeAfter)
+       if err != nil {
+               return nil, err
+       }
+
+       // // prepare the basic variables
+       var isIncremental = manager.IsIncremental()
+       var createdAfter *time.Time
+       if isIncremental {
+               createdAfter = manager.LatestState.LatestSuccessStart
+       } else {
+               createdAfter = manager.TimeAfter
+       }
+
+       // step 1: create a collector to collect newly added records
+       err = manager.InitCollector(ApiCollectorArgs{
+               ApiClient: args.ApiClient,
+               // common
+               Incremental: isIncremental,
+               UrlTemplate: args.CollectNewRecordsByList.UrlTemplate,
+               Query: func(reqData *RequestData) (url.Values, errors.Error) {
+                       if args.CollectNewRecordsByList.Query != nil {
+                               return 
args.CollectNewRecordsByList.Query(reqData, createdAfter)
+                       }
+                       return nil, nil
+               },
+               Header: func(reqData *RequestData) (http.Header, errors.Error) {
+                       if args.CollectNewRecordsByList.Header != nil {
+                               return 
args.CollectNewRecordsByList.Header(reqData, createdAfter)
+                       }
+                       return nil, nil
+               },
+               MinTickInterval: args.CollectNewRecordsByList.MinTickInterval,
+               ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
+                       items, err := 
args.CollectNewRecordsByList.ResponseParser(res)
+                       if err != nil {
+                               return nil, err
+                       }
+                       if len(items) == 0 {
+                               return nil, nil
+                       }
+
+                       // time filter or diff sync
+                       if createdAfter != nil && 
args.CollectNewRecordsByList.GetCreated != nil {
+                               // if the first record of the page was created 
before createdAfter, return emtpy set and stop
+                               firstCreated, err := 
args.CollectNewRecordsByList.GetCreated(items[0])
+                               if err != nil {
+                                       return nil, err
+                               }
+                               if firstCreated.Before(*createdAfter) {
+                                       return nil, ErrFinishCollect
+                               }
+                               // if the last record was created before 
createdAfter, return records and stop
+                               lastCreated, err := 
args.CollectNewRecordsByList.GetCreated(items[len(items)-1])
+                               if err != nil {
+                                       return nil, err
+                               }
+                               if lastCreated.Before(*createdAfter) {
+                                       return items, ErrFinishCollect
+                               }
+                       }
+                       return items, err
+               },
+               AfterResponse: args.CollectNewRecordsByList.AfterResponse,
+               RequestBody:   args.CollectNewRecordsByList.RequestBody,
+               Method:        args.CollectNewRecordsByList.Method,
+               // pagination
+               PageSize:              args.CollectNewRecordsByList.PageSize,
+               Concurrency:           args.CollectNewRecordsByList.Concurrency,
+               GetNextPageCustomData: 
args.CollectNewRecordsByList.GetNextPageCustomData,
+               // GetTotalPages:         
args.CollectNewRecordsByList.GetTotalPages,
+       })
+
+       if err != nil {
+               return nil, err
+       }
+
+       // step 2: create another collector to collect updated records
+       // TODO: this creates cursor before previous step gets executed, which 
is too early, to be optimized
+       input, err := args.CollectUnfinishedDetails.BuildInputIterator()
+       if err != nil {
+               return nil, err
+       }
+       err = manager.InitCollector(ApiCollectorArgs{
+               ApiClient: args.ApiClient,
+               // common
+               Incremental: true,
+               Input:       input,
+               UrlTemplate: args.CollectUnfinishedDetails.UrlTemplate,
+               Query: func(reqData *RequestData) (url.Values, errors.Error) {
+                       if args.CollectUnfinishedDetails.Query != nil {
+                               return 
args.CollectUnfinishedDetails.Query(reqData, createdAfter)
+                       }
+                       return nil, nil
+               },
+               Header: func(reqData *RequestData) (http.Header, errors.Error) {
+                       if args.CollectUnfinishedDetails.Header != nil {
+                               return 
args.CollectUnfinishedDetails.Header(reqData, createdAfter)
+                       }
+                       return nil, nil
+               },
+               MinTickInterval: args.CollectUnfinishedDetails.MinTickInterval,
+               ResponseParser:  args.CollectUnfinishedDetails.ResponseParser,
+               AfterResponse:   args.CollectUnfinishedDetails.AfterResponse,
+               RequestBody:     args.CollectUnfinishedDetails.RequestBody,
+               Method:          args.CollectUnfinishedDetails.Method,
+       })
+       return manager, err
+}
+
+type FinalizableApiCollectorArgs struct {
+       RawDataSubTaskArgs
+       ApiClient                RateLimitedApiClient
+       TimeAfter                *time.Time // leave it be nil to disable time 
filter
+       CollectNewRecordsByList  FinalizableApiCollectorListArgs
+       CollectUnfinishedDetails FinalizableApiCollectorDetailArgs
+}
+
+type FinalizableApiCollectorCommonArgs struct {
+       UrlTemplate     string `comment:"GoTemplate for API url"`
+       Query           func(reqData *RequestData, createdAfter *time.Time) 
(url.Values, errors.Error)
+       Header          func(reqData *RequestData, createdAfter *time.Time) 
(http.Header, errors.Error)
+       MinTickInterval *time.Duration
+       ResponseParser  func(res *http.Response) ([]json.RawMessage, 
errors.Error)
+       AfterResponse   common.ApiClientAfterResponse
+       RequestBody     func(reqData *RequestData) map[string]interface{}
+       Method          string
+}
+type FinalizableApiCollectorListArgs struct {
+       // optional, leave it be `nil` if API supports filtering by created 
date (Don't forget to set the Query)
+       GetCreated func(item json.RawMessage) (time.Time, errors.Error)
+       FinalizableApiCollectorCommonArgs
+       Concurrency           int
+       PageSize              int
+       GetNextPageCustomData func(prevReqData *RequestData, prevPageResponse 
*http.Response) (interface{}, errors.Error)
+       // need to consider the data missing problem: what if new data gets 
created during collection?
+       // GetTotalPages         func(res *http.Response, args 
*ApiCollectorArgs) (int, errors.Error)
+}
+type FinalizableApiCollectorDetailArgs struct {
+       FinalizableApiCollectorCommonArgs
+       BuildInputIterator func() (Iterator, errors.Error)
+}
diff --git a/backend/plugins/github/tasks/pr_collector.go 
b/backend/plugins/github/tasks/pr_collector.go
index 50cde330c..4456a9d3b 100644
--- a/backend/plugins/github/tasks/pr_collector.go
+++ b/backend/plugins/github/tasks/pr_collector.go
@@ -20,12 +20,17 @@ package tasks
 import (
        "encoding/json"
        "fmt"
+       "io"
        "net/http"
        "net/url"
+       "reflect"
+       "time"
 
+       "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+       "github.com/apache/incubator-devlake/plugins/github/models"
 )
 
 const RAW_PULL_REQUEST_TABLE = "github_api_pull_requests"
@@ -38,50 +43,94 @@ var CollectApiPullRequestsMeta = plugin.SubTaskMeta{
        DomainTypes:      []string{plugin.DOMAIN_TYPE_CROSS, 
plugin.DOMAIN_TYPE_CODE_REVIEW},
 }
 
-func CollectApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error {
-       data := taskCtx.GetData().(*GithubTaskData)
-       collectorWithState, err := 
helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
-               Ctx: taskCtx,
-               Params: GithubApiParams{
-                       ConnectionId: data.Options.ConnectionId,
-                       Name:         data.Options.Name,
-               },
-               Table: RAW_PULL_REQUEST_TABLE,
-       }, data.TimeAfter)
-       if err != nil {
-               return err
-       }
-
-       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-               ApiClient:   data.ApiClient,
-               PageSize:    100,
-               Incremental: false,
+type SimpleGithubPr struct {
+       GithubId int64
+}
 
-               UrlTemplate: "repos/{{ .Params.Name }}/pulls",
+type SimpleGithubApiPr struct {
+       CreatedAt time.Time `json:"created_at"`
+}
 
-               Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
-                       query := url.Values{}
-                       query.Set("state", "all")
-                       query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
-                       query.Set("direction", "asc")
-                       query.Set("per_page", fmt.Sprintf("%v", 
reqData.Pager.Size))
+func CollectApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error {
+       // pull requests are Finalizable, they can't be re-open once closed
+       data := taskCtx.GetData().(*GithubTaskData)
+       db := taskCtx.GetDal()
 
-                       return query, nil
+       collector, err := 
helper.NewStatefulApiCollectorForFinalizableEntity(helper.FinalizableApiCollectorArgs{
+               RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+                       Ctx: taskCtx,
+                       Params: GithubApiParams{
+                               ConnectionId: data.Options.ConnectionId,
+                               Name:         data.Options.Name,
+                       },
+                       Table: RAW_PULL_REQUEST_TABLE,
                },
-
-               GetTotalPages: GetTotalPagesFromResponse,
-               ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
-                       var items []json.RawMessage
-                       err := helper.UnmarshalResponse(res, &items)
-                       if err != nil {
-                               return nil, err
-                       }
-                       return items, nil
+               ApiClient: data.ApiClient,
+               TimeAfter: data.TimeAfter, // set to nil to disable timeFilter
+               CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{
+                       PageSize:    100,
+                       Concurrency: 10,
+                       FinalizableApiCollectorCommonArgs: 
helper.FinalizableApiCollectorCommonArgs{
+                               UrlTemplate: "repos/{{ .Params.Name }}/pulls",
+                               Query: func(reqData *helper.RequestData, 
createdAfter *time.Time) (url.Values, errors.Error) {
+                                       query := url.Values{}
+                                       query.Set("state", "all")
+                                       query.Set("direction", "desc")
+                                       query.Set("page", fmt.Sprintf("%v", 
reqData.Pager.Page))
+                                       query.Set("per_page", fmt.Sprintf("%v", 
reqData.Pager.Size))
+                                       return query, nil
+                               },
+                               ResponseParser: func(res *http.Response) 
([]json.RawMessage, errors.Error) {
+                                       var items []json.RawMessage
+                                       err := helper.UnmarshalResponse(res, 
&items)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       return items, nil
+                               },
+                       },
+                       GetCreated: func(item json.RawMessage) (time.Time, 
errors.Error) {
+                               pr := &SimpleGithubApiPr{}
+                               err := json.Unmarshal(item, pr)
+                               if err != nil {
+                                       return time.Time{}, 
errors.BadInput.Wrap(err, "failed to unmarshal github pull request")
+                               }
+                               return pr.CreatedAt, nil
+                       },
+               },
+               CollectUnfinishedDetails: 
helper.FinalizableApiCollectorDetailArgs{
+                       BuildInputIterator: func() (helper.Iterator, 
errors.Error) {
+                               // select pull id from database
+                               cursor, err := db.Cursor(
+                                       dal.Select("github_id"),
+                                       dal.From(&models.GithubPullRequest{}),
+                                       dal.Where(
+                                               "repo_id = ? AND connection_id 
= ? AND state != 'closed'",
+                                               data.Options.GithubId, 
data.Options.ConnectionId,
+                                       ),
+                               )
+                               if err != nil {
+                                       return nil, err
+                               }
+                               return helper.NewDalCursorIterator(db, cursor, 
reflect.TypeOf(SimpleGithubPr{}))
+                       },
+                       FinalizableApiCollectorCommonArgs: 
helper.FinalizableApiCollectorCommonArgs{
+                               UrlTemplate: "repos/{{ .Params.Name }}/pulls/{{ 
.Input.GithubId }}",
+                               ResponseParser: func(res *http.Response) 
([]json.RawMessage, errors.Error) {
+                                       body, err := io.ReadAll(res.Body)
+                                       if err != nil {
+                                               return nil, errors.Convert(err)
+                                       }
+                                       res.Body.Close()
+                                       return []json.RawMessage{body}, nil
+                               },
+                       },
                },
        })
+
        if err != nil {
                return err
        }
 
-       return collectorWithState.Execute()
+       return collector.Execute()
 }
diff --git a/backend/plugins/github_graphql/tasks/check_run_collector.go 
b/backend/plugins/github_graphql/tasks/check_run_collector.go
index e21db4d63..65a4b4c12 100644
--- a/backend/plugins/github_graphql/tasks/check_run_collector.go
+++ b/backend/plugins/github_graphql/tasks/check_run_collector.go
@@ -208,5 +208,5 @@ func CollectCheckRun(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
-       return collectorWithState.ExecuteGraphQL()
+       return collectorWithState.Execute()
 }

Reply via email to