This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new e7280929e feat: gitlab pipeline could have a different ratelimit to
other subtasks (#4370)
e7280929e is described below
commit e7280929efa1a924641385653fe514a70239bfd8
Author: Klesh Wong <[email protected]>
AuthorDate: Fri Feb 10 09:27:48 2023 +0800
feat: gitlab pipeline could have a different ratelimit to other subtasks
(#4370)
* feat: gitlab pipeline could have a different ratelimit to other subtasks
* fix: workscheduler unit test
---
.../helpers/pluginhelper/api/api_async_client.go | 58 +++++++++-------------
backend/helpers/pluginhelper/api/api_collector.go | 35 +++++++++++--
.../helpers/pluginhelper/api/worker_scheduler.go | 51 +++++++++++++------
.../pluginhelper/api/worker_scheduler_test.go | 10 ++--
backend/plugins/gitlab/tasks/api_client.go | 7 +--
backend/plugins/gitlab/tasks/pipeline_collector.go | 9 +++-
6 files changed, 104 insertions(+), 66 deletions(-)
diff --git a/backend/helpers/pluginhelper/api/api_async_client.go
b/backend/helpers/pluginhelper/api/api_async_client.go
index daead3ee1..e45794c44 100644
--- a/backend/helpers/pluginhelper/api/api_async_client.go
+++ b/backend/helpers/pluginhelper/api/api_async_client.go
@@ -21,14 +21,16 @@ import (
"bytes"
"context"
"fmt"
- "github.com/apache/incubator-devlake/core/errors"
- plugin "github.com/apache/incubator-devlake/core/plugin"
- "github.com/apache/incubator-devlake/core/utils"
- "github.com/apache/incubator-devlake/helpers/pluginhelper/common"
"io"
"net/http"
"net/url"
"time"
+
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/log"
+ plugin "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/core/utils"
+ "github.com/apache/incubator-devlake/helpers/pluginhelper/common"
)
// HttpMinStatusRetryCode is which status will retry
@@ -39,9 +41,10 @@ var HttpMinStatusRetryCode = http.StatusBadRequest
// will be performed in parallel with rate-limit support
type ApiAsyncClient struct {
*ApiClient
+ *WorkerScheduler
maxRetry int
- scheduler *WorkerScheduler
numOfWorkers int
+ logger log.Logger
}
const defaultTimeout = 120 * time.Second
@@ -96,20 +99,24 @@ func CreateAsyncApiClient(
// in order for scheduler to hold requests of 3 seconds, we need:
d := duration / RESPONSE_TIME
numOfWorkers := requests / int(d)
+ tickInterval, err := CalcTickInterval(requests, duration)
+ if err != nil {
+ return nil, err
+ }
- logger := taskCtx.GetLogger()
+ logger := taskCtx.GetLogger().Nested("api async client")
logger.Info(
- "creating scheduler for api \"%s\", number of workers: %d,
allowed requests (rate-limit): %d, duration of rate-limit: %f minutes",
+ "creating scheduler for api \"%s\", number of workers: %d, %d
reqs / %s (interval: %s)",
apiClient.GetEndpoint(),
numOfWorkers,
requests,
- duration.Minutes(),
+ duration.String(),
+ tickInterval.String(),
)
scheduler, err := NewWorkerScheduler(
taskCtx.GetContext(),
numOfWorkers,
- requests,
- duration,
+ tickInterval,
logger,
)
if err != nil {
@@ -119,9 +126,10 @@ func CreateAsyncApiClient(
// finally, wrap around api client with async sematic
return &ApiAsyncClient{
apiClient,
- retry,
scheduler,
+ retry,
numOfWorkers,
+ logger,
}, nil
}
@@ -188,8 +196,8 @@ func (apiClient *ApiAsyncClient) DoAsync(
if retry < apiClient.maxRetry && err !=
context.Canceled {
apiClient.logger.Warn(err, "retry #%d calling
%s", retry, path)
retry++
- apiClient.scheduler.NextTick(func()
errors.Error {
-
apiClient.scheduler.SubmitBlocking(request)
+ apiClient.NextTick(func() errors.Error {
+ apiClient.SubmitBlocking(request)
return nil
})
return nil
@@ -206,7 +214,7 @@ func (apiClient *ApiAsyncClient) DoAsync(
// when error occurs
return handler(res)
}
- apiClient.scheduler.SubmitBlocking(request)
+ apiClient.SubmitBlocking(request)
}
// DoGetAsync Enqueue an api get request, the request may be sent sometime in
future in parallel with other api requests
@@ -230,31 +238,11 @@ func (apiClient *ApiAsyncClient) DoPostAsync(
apiClient.DoAsync(http.MethodPost, path, query, body, header, handler,
0)
}
-// WaitAsync blocks until all async requests were done
-func (apiClient *ApiAsyncClient) WaitAsync() errors.Error {
- return apiClient.scheduler.Wait()
-}
-
-// HasError to return if the scheduler has Error
-func (apiClient *ApiAsyncClient) HasError() bool {
- return apiClient.scheduler.HasError()
-}
-
-// NextTick to return the NextTick of scheduler
-func (apiClient *ApiAsyncClient) NextTick(task func() errors.Error) {
- apiClient.scheduler.NextTick(task)
-}
-
// GetNumOfWorkers to return the Workers count if scheduler.
func (apiClient *ApiAsyncClient) GetNumOfWorkers() int {
return apiClient.numOfWorkers
}
-// Release will release the ApiAsyncClient with scheduler
-func (apiClient *ApiAsyncClient) Release() {
- apiClient.scheduler.Release()
-}
-
// RateLimitedApiClient FIXME ...
type RateLimitedApiClient interface {
DoGetAsync(path string, query url.Values, header http.Header, handler
common.ApiAsyncCallback)
@@ -265,6 +253,8 @@ type RateLimitedApiClient interface {
GetNumOfWorkers() int
GetAfterFunction() common.ApiClientAfterResponse
SetAfterFunction(callback common.ApiClientAfterResponse)
+ Reset(d time.Duration)
+ GetTickInterval() time.Duration
Release()
}
diff --git a/backend/helpers/pluginhelper/api/api_collector.go
b/backend/helpers/pluginhelper/api/api_collector.go
index 98f25ce59..27b6b8c5c 100644
--- a/backend/helpers/pluginhelper/api/api_collector.go
+++ b/backend/helpers/pluginhelper/api/api_collector.go
@@ -21,14 +21,16 @@ import (
"bytes"
"encoding/json"
"fmt"
- "github.com/apache/incubator-devlake/core/dal"
- "github.com/apache/incubator-devlake/core/errors"
- plugin "github.com/apache/incubator-devlake/core/plugin"
- "github.com/apache/incubator-devlake/helpers/pluginhelper/common"
"io"
"net/http"
"net/url"
"text/template"
+ "time"
+
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/core/errors"
+ plugin "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/pluginhelper/common"
)
// Pager contains pagination information for a api request
@@ -72,7 +74,8 @@ type ApiCollectorArgs struct {
// Incremental indicate if this is an incremental collection, the
existing data won't get deleted if it was true
Incremental bool `comment:"indicate if this collection is incremental
update"`
// ApiClient is a asynchronize api request client with qps
- ApiClient RateLimitedApiClient
+ ApiClient RateLimitedApiClient
+ MinTickInterval *time.Duration
// Input helps us collect data based on previous collected data, like
collecting changelogs based on jira
// issue ids
Input Iterator
@@ -153,6 +156,24 @@ func (collector *ApiCollector) Execute() errors.Error {
}
}
+ // if MinTickInterval was specified
+ if collector.args.MinTickInterval != nil {
+ minTickInterval := *collector.args.MinTickInterval
+ if minTickInterval <= time.Duration(0) {
+ return errors.Default.Wrap(err, "MinTickInterval must
be greater than 0")
+ }
+ oldTickInterval := collector.args.ApiClient.GetTickInterval()
+ if oldTickInterval < minTickInterval {
+ // reset the tick interval only if it exceeded the
specified limit
+ logger.Info("set tick interval to %v",
minTickInterval.String())
+ collector.args.ApiClient.Reset(minTickInterval)
+ defer func() {
+ logger.Info("restore tick interval to %v",
oldTickInterval.String())
+ collector.args.ApiClient.Reset(oldTickInterval)
+ }()
+ }
+ }
+
collector.args.Ctx.SetProgress(0, -1)
if collector.args.Input != nil {
iterator := collector.args.Input
@@ -210,12 +231,16 @@ func (collector *ApiCollector) exec(input interface{}) {
Page: 1,
Size: collector.args.PageSize,
}
+ // featch the detail
if collector.args.PageSize <= 0 {
collector.fetchAsync(reqData, nil)
+ // fetch pages sequentially
} else if collector.args.GetNextPageCustomData != nil {
collector.fetchPagesSequentially(reqData)
+ // fetch pages in parallel with number of total pages can be
determined from the first page
} else if collector.args.GetTotalPages != nil {
collector.fetchPagesDetermined(reqData)
+ // fetch pages in parallel without number of total pages
} else {
collector.fetchPagesUndetermined(reqData)
}
diff --git a/backend/helpers/pluginhelper/api/worker_scheduler.go
b/backend/helpers/pluginhelper/api/worker_scheduler.go
index f72450b37..c9ee31e85 100644
--- a/backend/helpers/pluginhelper/api/worker_scheduler.go
+++ b/backend/helpers/pluginhelper/api/worker_scheduler.go
@@ -19,15 +19,28 @@ package api
import (
"context"
- "github.com/apache/incubator-devlake/core/errors"
- "github.com/apache/incubator-devlake/core/log"
"sync"
"sync/atomic"
"time"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/log"
+
"github.com/panjf2000/ants/v2"
)
+// CalcTickInterval calculates tick interval for number of works to be
+// executed in specified duration
+func CalcTickInterval(numOfWorks int, duration time.Duration) (time.Duration,
errors.Error) {
+ if numOfWorks <= 0 {
+ return 0, errors.Default.New("numOfWorks must be greater than
0")
+ }
+ if duration <= 0 {
+ return 0, errors.Default.New("duration must be greater than 0")
+ }
+ return duration / time.Duration(numOfWorks), nil
+}
+
// WorkerScheduler runs asynchronous tasks in parallel with throttling support
type WorkerScheduler struct {
waitGroup sync.WaitGroup
@@ -38,6 +51,7 @@ type WorkerScheduler struct {
mu sync.Mutex
counter int32
logger log.Logger
+ tickInterval time.Duration
}
//var callframeEnabled = os.Getenv("ASYNC_CF") == "true"
@@ -45,23 +59,17 @@ type WorkerScheduler struct {
// NewWorkerScheduler creates a WorkerScheduler
func NewWorkerScheduler(
ctx context.Context,
- workerNum int,
- maxWork int,
- maxWorkDuration time.Duration,
+ numOfWorkers int,
+ tickInterval time.Duration,
logger log.Logger,
) (*WorkerScheduler, errors.Error) {
- if maxWork <= 0 {
- return nil, errors.Default.New("maxWork less than 1")
- }
- if maxWorkDuration <= 0 {
- return nil, errors.Default.New("maxWorkDuration less than 1")
- }
s := &WorkerScheduler{
- ctx: ctx,
- ticker: time.NewTicker(maxWorkDuration /
time.Duration(maxWork)),
- logger: logger,
+ ctx: ctx,
+ logger: logger,
+ tickInterval: tickInterval,
+ ticker: time.NewTicker(tickInterval),
}
- pool, err := ants.NewPool(workerNum, ants.WithPanicHandler(func(i
interface{}) {
+ pool, err := ants.NewPool(numOfWorkers, ants.WithPanicHandler(func(i
interface{}) {
s.checkError(i)
}))
if err != nil {
@@ -148,7 +156,7 @@ func (s *WorkerScheduler) NextTick(task func()
errors.Error) {
}
// Wait blocks current go-routine until all workers returned
-func (s *WorkerScheduler) Wait() errors.Error {
+func (s *WorkerScheduler) WaitAsync() errors.Error {
s.waitGroup.Wait()
if len(s.workerErrors) > 0 {
for _, err := range s.workerErrors {
@@ -161,6 +169,17 @@ func (s *WorkerScheduler) Wait() errors.Error {
return nil
}
+// Reset stops a WorkScheduler and resets its period to the specified duration.
+func (s *WorkerScheduler) Reset(interval time.Duration) {
+ s.tickInterval = interval
+ s.ticker.Reset(interval)
+}
+
+// GetTickInterval returns current tick interval of the WorkScheduler
+func (s *WorkerScheduler) GetTickInterval() time.Duration {
+ return s.tickInterval
+}
+
// Release resources
func (s *WorkerScheduler) Release() {
s.waitGroup.Wait()
diff --git a/backend/helpers/pluginhelper/api/worker_scheduler_test.go
b/backend/helpers/pluginhelper/api/worker_scheduler_test.go
index 9a4b5e88f..077ae3bbd 100644
--- a/backend/helpers/pluginhelper/api/worker_scheduler_test.go
+++ b/backend/helpers/pluginhelper/api/worker_scheduler_test.go
@@ -19,11 +19,12 @@ package api
import (
"context"
- "github.com/apache/incubator-devlake/core/errors"
- "github.com/apache/incubator-devlake/helpers/unithelper"
"testing"
"time"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/helpers/unithelper"
+
"github.com/stretchr/testify/assert"
)
@@ -31,7 +32,8 @@ func TestWorkerSchedulerQpsControl(t *testing.T) {
// assuming we want 2 requests per second
testChannel := make(chan int, 100)
ctx, cancel := context.WithCancel(context.Background())
- s, _ := NewWorkerScheduler(ctx, 5, 2, 1*time.Second,
unithelper.DummyLogger())
+ tickInterval, _ := CalcTickInterval(2, 1*time.Second)
+ s, _ := NewWorkerScheduler(ctx, 5, tickInterval,
unithelper.DummyLogger())
defer s.Release()
for i := 1; i <= 5; i++ {
t := i
@@ -56,7 +58,7 @@ func TestWorkerSchedulerQpsControl(t *testing.T) {
if len(testChannel) > 4 {
t.Fatal(`worker run too fast after a second`)
}
- assert.Nil(t, s.Wait())
+ assert.Nil(t, s.WaitAsync())
if len(testChannel) != 5 {
t.Fatal(`worker not wait until finish`)
}
diff --git a/backend/plugins/gitlab/tasks/api_client.go
b/backend/plugins/gitlab/tasks/api_client.go
index 333378473..df4517ba0 100644
--- a/backend/plugins/gitlab/tasks/api_client.go
+++ b/backend/plugins/gitlab/tasks/api_client.go
@@ -48,12 +48,7 @@ func NewGitlabApiClient(taskCtx plugin.TaskContext,
connection *models.GitlabCon
if err != nil {
return 0, 0, errors.Default.Wrap(err, "failed
to parse RateLimit-Limit header")
}
- // seems like gitlab rate limit is on minute basis
- if rateLimit > 200 {
- return 200, 1 * time.Minute, nil
- } else {
- return rateLimit, 1 * time.Minute, nil
- }
+ return rateLimit, 1 * time.Minute, nil
},
}
asyncApiClient, err := api.CreateAsyncApiClient(
diff --git a/backend/plugins/gitlab/tasks/pipeline_collector.go
b/backend/plugins/gitlab/tasks/pipeline_collector.go
index 5987f641f..d8a487f30 100644
--- a/backend/plugins/gitlab/tasks/pipeline_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_collector.go
@@ -19,10 +19,12 @@ package tasks
import (
"fmt"
+ "net/url"
+ "time"
+
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
- "net/url"
)
const RAW_PIPELINE_TABLE = "gitlab_api_pipeline"
@@ -42,10 +44,15 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
+ tickInterval, err := helper.CalcTickInterval(200, 1*time.Minute)
+ if err != nil {
+ return err
+ }
incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
+ MinTickInterval: &tickInterval,
PageSize: 100,
Incremental: incremental,
UrlTemplate: "projects/{{ .Params.ProjectId
}}/pipelines",