This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new e7280929e feat: gitlab pipeline could have a different ratelimit to 
other subtasks (#4370)
e7280929e is described below

commit e7280929efa1a924641385653fe514a70239bfd8
Author: Klesh Wong <[email protected]>
AuthorDate: Fri Feb 10 09:27:48 2023 +0800

    feat: gitlab pipeline could have a different ratelimit to other subtasks 
(#4370)
    
    * feat: gitlab pipeline could have a different ratelimit to other subtasks
    
    * fix: workscheduler unit test
---
 .../helpers/pluginhelper/api/api_async_client.go   | 58 +++++++++-------------
 backend/helpers/pluginhelper/api/api_collector.go  | 35 +++++++++++--
 .../helpers/pluginhelper/api/worker_scheduler.go   | 51 +++++++++++++------
 .../pluginhelper/api/worker_scheduler_test.go      | 10 ++--
 backend/plugins/gitlab/tasks/api_client.go         |  7 +--
 backend/plugins/gitlab/tasks/pipeline_collector.go |  9 +++-
 6 files changed, 104 insertions(+), 66 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/api_async_client.go 
b/backend/helpers/pluginhelper/api/api_async_client.go
index daead3ee1..e45794c44 100644
--- a/backend/helpers/pluginhelper/api/api_async_client.go
+++ b/backend/helpers/pluginhelper/api/api_async_client.go
@@ -21,14 +21,16 @@ import (
        "bytes"
        "context"
        "fmt"
-       "github.com/apache/incubator-devlake/core/errors"
-       plugin "github.com/apache/incubator-devlake/core/plugin"
-       "github.com/apache/incubator-devlake/core/utils"
-       "github.com/apache/incubator-devlake/helpers/pluginhelper/common"
        "io"
        "net/http"
        "net/url"
        "time"
+
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/log"
+       plugin "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/core/utils"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/common"
 )
 
 // HttpMinStatusRetryCode is which status will retry
@@ -39,9 +41,10 @@ var HttpMinStatusRetryCode = http.StatusBadRequest
 // will be performed in parallel with rate-limit support
 type ApiAsyncClient struct {
        *ApiClient
+       *WorkerScheduler
        maxRetry     int
-       scheduler    *WorkerScheduler
        numOfWorkers int
+       logger       log.Logger
 }
 
 const defaultTimeout = 120 * time.Second
@@ -96,20 +99,24 @@ func CreateAsyncApiClient(
        // in order for scheduler to hold requests of 3 seconds, we need:
        d := duration / RESPONSE_TIME
        numOfWorkers := requests / int(d)
+       tickInterval, err := CalcTickInterval(requests, duration)
+       if err != nil {
+               return nil, err
+       }
 
-       logger := taskCtx.GetLogger()
+       logger := taskCtx.GetLogger().Nested("api async client")
        logger.Info(
-               "creating scheduler for api \"%s\", number of workers: %d, 
allowed requests (rate-limit): %d, duration of rate-limit: %f minutes",
+               "creating scheduler for api \"%s\", number of workers: %d, %d 
reqs / %s (interval: %s)",
                apiClient.GetEndpoint(),
                numOfWorkers,
                requests,
-               duration.Minutes(),
+               duration.String(),
+               tickInterval.String(),
        )
        scheduler, err := NewWorkerScheduler(
                taskCtx.GetContext(),
                numOfWorkers,
-               requests,
-               duration,
+               tickInterval,
                logger,
        )
        if err != nil {
@@ -119,9 +126,10 @@ func CreateAsyncApiClient(
        // finally, wrap around api client with async sematic
        return &ApiAsyncClient{
                apiClient,
-               retry,
                scheduler,
+               retry,
                numOfWorkers,
+               logger,
        }, nil
 }
 
@@ -188,8 +196,8 @@ func (apiClient *ApiAsyncClient) DoAsync(
                        if retry < apiClient.maxRetry && err != 
context.Canceled {
                                apiClient.logger.Warn(err, "retry #%d calling 
%s", retry, path)
                                retry++
-                               apiClient.scheduler.NextTick(func() 
errors.Error {
-                                       
apiClient.scheduler.SubmitBlocking(request)
+                               apiClient.NextTick(func() errors.Error {
+                                       apiClient.SubmitBlocking(request)
                                        return nil
                                })
                                return nil
@@ -206,7 +214,7 @@ func (apiClient *ApiAsyncClient) DoAsync(
                // when error occurs
                return handler(res)
        }
-       apiClient.scheduler.SubmitBlocking(request)
+       apiClient.SubmitBlocking(request)
 }
 
 // DoGetAsync Enqueue an api get request, the request may be sent sometime in 
future in parallel with other api requests
@@ -230,31 +238,11 @@ func (apiClient *ApiAsyncClient) DoPostAsync(
        apiClient.DoAsync(http.MethodPost, path, query, body, header, handler, 
0)
 }
 
-// WaitAsync blocks until all async requests were done
-func (apiClient *ApiAsyncClient) WaitAsync() errors.Error {
-       return apiClient.scheduler.Wait()
-}
-
-// HasError to return if the scheduler has Error
-func (apiClient *ApiAsyncClient) HasError() bool {
-       return apiClient.scheduler.HasError()
-}
-
-// NextTick to return the NextTick of scheduler
-func (apiClient *ApiAsyncClient) NextTick(task func() errors.Error) {
-       apiClient.scheduler.NextTick(task)
-}
-
 // GetNumOfWorkers to return the Workers count if scheduler.
 func (apiClient *ApiAsyncClient) GetNumOfWorkers() int {
        return apiClient.numOfWorkers
 }
 
-// Release will release the ApiAsyncClient with scheduler
-func (apiClient *ApiAsyncClient) Release() {
-       apiClient.scheduler.Release()
-}
-
 // RateLimitedApiClient FIXME ...
 type RateLimitedApiClient interface {
        DoGetAsync(path string, query url.Values, header http.Header, handler 
common.ApiAsyncCallback)
@@ -265,6 +253,8 @@ type RateLimitedApiClient interface {
        GetNumOfWorkers() int
        GetAfterFunction() common.ApiClientAfterResponse
        SetAfterFunction(callback common.ApiClientAfterResponse)
+       Reset(d time.Duration)
+       GetTickInterval() time.Duration
        Release()
 }
 
diff --git a/backend/helpers/pluginhelper/api/api_collector.go 
b/backend/helpers/pluginhelper/api/api_collector.go
index 98f25ce59..27b6b8c5c 100644
--- a/backend/helpers/pluginhelper/api/api_collector.go
+++ b/backend/helpers/pluginhelper/api/api_collector.go
@@ -21,14 +21,16 @@ import (
        "bytes"
        "encoding/json"
        "fmt"
-       "github.com/apache/incubator-devlake/core/dal"
-       "github.com/apache/incubator-devlake/core/errors"
-       plugin "github.com/apache/incubator-devlake/core/plugin"
-       "github.com/apache/incubator-devlake/helpers/pluginhelper/common"
        "io"
        "net/http"
        "net/url"
        "text/template"
+       "time"
+
+       "github.com/apache/incubator-devlake/core/dal"
+       "github.com/apache/incubator-devlake/core/errors"
+       plugin "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/common"
 )
 
 // Pager contains pagination information for a api request
@@ -72,7 +74,8 @@ type ApiCollectorArgs struct {
        // Incremental indicate if this is an incremental collection, the 
existing data won't get deleted if it was true
        Incremental bool `comment:"indicate if this collection is incremental 
update"`
        // ApiClient is a asynchronize api request client with qps
-       ApiClient RateLimitedApiClient
+       ApiClient       RateLimitedApiClient
+       MinTickInterval *time.Duration
        // Input helps us collect data based on previous collected data, like 
collecting changelogs based on jira
        // issue ids
        Input Iterator
@@ -153,6 +156,24 @@ func (collector *ApiCollector) Execute() errors.Error {
                }
        }
 
+       // if MinTickInterval was specified
+       if collector.args.MinTickInterval != nil {
+               minTickInterval := *collector.args.MinTickInterval
+               if minTickInterval <= time.Duration(0) {
+                       return errors.Default.Wrap(err, "MinTickInterval must 
be greater than 0")
+               }
+               oldTickInterval := collector.args.ApiClient.GetTickInterval()
+               if oldTickInterval < minTickInterval {
+                       // reset the tick interval only if it exceeded the 
specified limit
+                       logger.Info("set tick interval to %v", 
minTickInterval.String())
+                       collector.args.ApiClient.Reset(minTickInterval)
+                       defer func() {
+                               logger.Info("restore tick interval to %v", 
oldTickInterval.String())
+                               collector.args.ApiClient.Reset(oldTickInterval)
+                       }()
+               }
+       }
+
        collector.args.Ctx.SetProgress(0, -1)
        if collector.args.Input != nil {
                iterator := collector.args.Input
@@ -210,12 +231,16 @@ func (collector *ApiCollector) exec(input interface{}) {
                Page: 1,
                Size: collector.args.PageSize,
        }
+       // featch the detail
        if collector.args.PageSize <= 0 {
                collector.fetchAsync(reqData, nil)
+               // fetch pages sequentially
        } else if collector.args.GetNextPageCustomData != nil {
                collector.fetchPagesSequentially(reqData)
+               // fetch pages in parallel with number of total pages can be 
determined from the first page
        } else if collector.args.GetTotalPages != nil {
                collector.fetchPagesDetermined(reqData)
+               // fetch pages in parallel without number of total pages
        } else {
                collector.fetchPagesUndetermined(reqData)
        }
diff --git a/backend/helpers/pluginhelper/api/worker_scheduler.go 
b/backend/helpers/pluginhelper/api/worker_scheduler.go
index f72450b37..c9ee31e85 100644
--- a/backend/helpers/pluginhelper/api/worker_scheduler.go
+++ b/backend/helpers/pluginhelper/api/worker_scheduler.go
@@ -19,15 +19,28 @@ package api
 
 import (
        "context"
-       "github.com/apache/incubator-devlake/core/errors"
-       "github.com/apache/incubator-devlake/core/log"
        "sync"
        "sync/atomic"
        "time"
 
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/log"
+
        "github.com/panjf2000/ants/v2"
 )
 
+// CalcTickInterval calculates tick interval for number of works to be
+// executed in specified duration
+func CalcTickInterval(numOfWorks int, duration time.Duration) (time.Duration, 
errors.Error) {
+       if numOfWorks <= 0 {
+               return 0, errors.Default.New("numOfWorks must be greater than 
0")
+       }
+       if duration <= 0 {
+               return 0, errors.Default.New("duration must be greater than 0")
+       }
+       return duration / time.Duration(numOfWorks), nil
+}
+
 // WorkerScheduler runs asynchronous tasks in parallel with throttling support
 type WorkerScheduler struct {
        waitGroup    sync.WaitGroup
@@ -38,6 +51,7 @@ type WorkerScheduler struct {
        mu           sync.Mutex
        counter      int32
        logger       log.Logger
+       tickInterval time.Duration
 }
 
 //var callframeEnabled = os.Getenv("ASYNC_CF") == "true"
@@ -45,23 +59,17 @@ type WorkerScheduler struct {
 // NewWorkerScheduler creates a WorkerScheduler
 func NewWorkerScheduler(
        ctx context.Context,
-       workerNum int,
-       maxWork int,
-       maxWorkDuration time.Duration,
+       numOfWorkers int,
+       tickInterval time.Duration,
        logger log.Logger,
 ) (*WorkerScheduler, errors.Error) {
-       if maxWork <= 0 {
-               return nil, errors.Default.New("maxWork less than 1")
-       }
-       if maxWorkDuration <= 0 {
-               return nil, errors.Default.New("maxWorkDuration less than 1")
-       }
        s := &WorkerScheduler{
-               ctx:    ctx,
-               ticker: time.NewTicker(maxWorkDuration / 
time.Duration(maxWork)),
-               logger: logger,
+               ctx:          ctx,
+               logger:       logger,
+               tickInterval: tickInterval,
+               ticker:       time.NewTicker(tickInterval),
        }
-       pool, err := ants.NewPool(workerNum, ants.WithPanicHandler(func(i 
interface{}) {
+       pool, err := ants.NewPool(numOfWorkers, ants.WithPanicHandler(func(i 
interface{}) {
                s.checkError(i)
        }))
        if err != nil {
@@ -148,7 +156,7 @@ func (s *WorkerScheduler) NextTick(task func() 
errors.Error) {
 }
 
 // Wait blocks current go-routine until all workers returned
-func (s *WorkerScheduler) Wait() errors.Error {
+func (s *WorkerScheduler) WaitAsync() errors.Error {
        s.waitGroup.Wait()
        if len(s.workerErrors) > 0 {
                for _, err := range s.workerErrors {
@@ -161,6 +169,17 @@ func (s *WorkerScheduler) Wait() errors.Error {
        return nil
 }
 
+// Reset stops a WorkScheduler and resets its period to the specified duration.
+func (s *WorkerScheduler) Reset(interval time.Duration) {
+       s.tickInterval = interval
+       s.ticker.Reset(interval)
+}
+
+// GetTickInterval returns current tick interval of the WorkScheduler
+func (s *WorkerScheduler) GetTickInterval() time.Duration {
+       return s.tickInterval
+}
+
 // Release resources
 func (s *WorkerScheduler) Release() {
        s.waitGroup.Wait()
diff --git a/backend/helpers/pluginhelper/api/worker_scheduler_test.go 
b/backend/helpers/pluginhelper/api/worker_scheduler_test.go
index 9a4b5e88f..077ae3bbd 100644
--- a/backend/helpers/pluginhelper/api/worker_scheduler_test.go
+++ b/backend/helpers/pluginhelper/api/worker_scheduler_test.go
@@ -19,11 +19,12 @@ package api
 
 import (
        "context"
-       "github.com/apache/incubator-devlake/core/errors"
-       "github.com/apache/incubator-devlake/helpers/unithelper"
        "testing"
        "time"
 
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/helpers/unithelper"
+
        "github.com/stretchr/testify/assert"
 )
 
@@ -31,7 +32,8 @@ func TestWorkerSchedulerQpsControl(t *testing.T) {
        // assuming we want 2 requests per second
        testChannel := make(chan int, 100)
        ctx, cancel := context.WithCancel(context.Background())
-       s, _ := NewWorkerScheduler(ctx, 5, 2, 1*time.Second, 
unithelper.DummyLogger())
+       tickInterval, _ := CalcTickInterval(2, 1*time.Second)
+       s, _ := NewWorkerScheduler(ctx, 5, tickInterval, 
unithelper.DummyLogger())
        defer s.Release()
        for i := 1; i <= 5; i++ {
                t := i
@@ -56,7 +58,7 @@ func TestWorkerSchedulerQpsControl(t *testing.T) {
        if len(testChannel) > 4 {
                t.Fatal(`worker run too fast after a second`)
        }
-       assert.Nil(t, s.Wait())
+       assert.Nil(t, s.WaitAsync())
        if len(testChannel) != 5 {
                t.Fatal(`worker not wait until finish`)
        }
diff --git a/backend/plugins/gitlab/tasks/api_client.go 
b/backend/plugins/gitlab/tasks/api_client.go
index 333378473..df4517ba0 100644
--- a/backend/plugins/gitlab/tasks/api_client.go
+++ b/backend/plugins/gitlab/tasks/api_client.go
@@ -48,12 +48,7 @@ func NewGitlabApiClient(taskCtx plugin.TaskContext, 
connection *models.GitlabCon
                        if err != nil {
                                return 0, 0, errors.Default.Wrap(err, "failed 
to parse RateLimit-Limit header")
                        }
-                       // seems like gitlab rate limit is on minute basis
-                       if rateLimit > 200 {
-                               return 200, 1 * time.Minute, nil
-                       } else {
-                               return rateLimit, 1 * time.Minute, nil
-                       }
+                       return rateLimit, 1 * time.Minute, nil
                },
        }
        asyncApiClient, err := api.CreateAsyncApiClient(
diff --git a/backend/plugins/gitlab/tasks/pipeline_collector.go 
b/backend/plugins/gitlab/tasks/pipeline_collector.go
index 5987f641f..d8a487f30 100644
--- a/backend/plugins/gitlab/tasks/pipeline_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_collector.go
@@ -19,10 +19,12 @@ package tasks
 
 import (
        "fmt"
+       "net/url"
+       "time"
+
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-       "net/url"
 )
 
 const RAW_PIPELINE_TABLE = "gitlab_api_pipeline"
@@ -42,10 +44,15 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
 
+       tickInterval, err := helper.CalcTickInterval(200, 1*time.Minute)
+       if err != nil {
+               return err
+       }
        incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                RawDataSubTaskArgs: *rawDataSubTaskArgs,
                ApiClient:          data.ApiClient,
+               MinTickInterval:    &tickInterval,
                PageSize:           100,
                Incremental:        incremental,
                UrlTemplate:        "projects/{{ .Params.ProjectId 
}}/pipelines",

Reply via email to