likyh commented on code in PR #2053:
URL: https://github.com/apache/incubator-devlake/pull/2053#discussion_r890094608


##########
config/config.go:
##########
@@ -69,11 +69,14 @@ func replaceNewEnvItemInOldContent(v *viper.Viper, 
envFileContent string) (error
                        switch ret := val.(type) {
                        case string:
                                ret = strings.Replace(ret, `\`, `\\`, -1)
-                               ret = strings.Replace(ret, `=`, `\=`, -1)
-                               ret = strings.Replace(ret, `'`, `\'`, -1)
+                               //ret = strings.Replace(ret, `=`, `\=`, -1)
+                               //ret = strings.Replace(ret, `'`, `\'`, -1)

Review Comment:
   why?



##########
Makefile:
##########
@@ -51,9 +51,13 @@ configure-dev:
 commit:
        git cz
 
+mock:

Review Comment:
   lint not passed



##########
helpers/unithelper/dummy_logger.go:
##########
@@ -0,0 +1,18 @@
+package unithelper

Review Comment:
   There are some errors when I run `mockery --all --unroll-variadic=false`.
   
   ```
   07 Jun 22 16:27 CST ERR Error parsing file 
error="/Users/lin/projects/lake/helpers/unithelper/dummy_logger.go:4:2: could 
not import github.com/apache/incubator-devlake/mocks (invalid package name: 
\"\")" dry-run=false version=v2.12.3
   07 Jun 22 16:27 CST ERR Error parsing file 
error="/Users/lin/projects/lake/helpers/unithelper/dummy_logger.go:4:2: could 
not import github.com/apache/incubator-devlake/mocks (invalid package name: 
\"\")" dry-run=false version=v2.12.3
   ```



##########
plugins/helper/worker_scheduler.go:
##########
@@ -22,133 +22,155 @@ import (
        "fmt"
        "os"
        "sync"
+       "sync/atomic"
        "time"
 
+       "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/utils"
        "github.com/panjf2000/ants/v2"
 )
 
+// WorkerScheduler runs asynchronous tasks in parallel with throttling support
 type WorkerScheduler struct {
-       waitGroup    *sync.WaitGroup
+       waitGroup    sync.WaitGroup
        pool         *ants.Pool
-       subPool      *ants.Pool
        ticker       *time.Ticker
-       workerErrors *[]error
+       workerErrors []error
        ctx          context.Context
+       mu           sync.Mutex
+       counter      int32
+       logger       core.Logger
 }
 
-// NewWorkerScheduler 创建一个并行执行的调度器,控制最大运行数和每秒最大运行数量
-// NewWorkerScheduler Create a parallel scheduler to control the maximum 
number of runs and the maximum number of runs per second
-// 注意: task执行是无序的
-// Warning: task execution is out of order
-func NewWorkerScheduler(workerNum int, maxWork int, maxWorkDuration 
time.Duration, ctx context.Context, maxRetry int) (*WorkerScheduler, error) {
-       var waitGroup sync.WaitGroup
-       workerErrors := make([]error, 0)
-       pWorkerErrors := &workerErrors
-       var mux sync.Mutex
-       pool, err := ants.NewPool(workerNum, ants.WithPanicHandler(func(i 
interface{}) {
-               mux.Lock()
-               defer mux.Unlock()
-               workerErrors = append(*pWorkerErrors, i.(error))
-               pWorkerErrors = &workerErrors
-       }))
-       if err != nil {
-               return nil, err
+var callframeEnabled = os.Getenv("ASYNC_CF") == "true"
+
+// NewWorkerScheduler creates a WorkerScheduler
+func NewWorkerScheduler(
+       workerNum int,
+       maxWork int,
+       maxWorkDuration time.Duration,
+       ctx context.Context,
+       maxRetry int,
+       logger core.Logger,
+) (*WorkerScheduler, error) {
+       if maxWork <= 0 {
+               return nil, fmt.Errorf("maxWork less than 1")
        }
-       subPool, err := ants.NewPool(workerNum*maxRetry, 
ants.WithPanicHandler(func(i interface{}) {
-               mux.Lock()
-               defer mux.Unlock()
-               workerErrors = append(*pWorkerErrors, i.(error))
-               pWorkerErrors = &workerErrors
+       if maxWorkDuration <= 0 {
+               return nil, fmt.Errorf("maxWorkDuration less than 1")
+       }
+       s := &WorkerScheduler{
+               ctx:    ctx,
+               ticker: time.NewTicker(maxWorkDuration / 
time.Duration(maxWork)),
+               logger: logger,
+       }
+       pool, err := ants.NewPool(workerNum, ants.WithPanicHandler(func(i 
interface{}) {
+               s.mu.Lock()
+               defer s.mu.Unlock()
+               s.workerErrors = append(s.workerErrors, i.(error))
        }))
        if err != nil {
                return nil, err
        }
-       var ticker *time.Ticker
-       if maxWork > 0 {
-               ticker = time.NewTicker(maxWorkDuration / 
time.Duration(maxWork))
-       }
-       scheduler := &WorkerScheduler{
-               waitGroup:    &waitGroup,
-               pool:         pool,
-               subPool:      subPool,
-               ticker:       ticker,
-               workerErrors: pWorkerErrors,
-               ctx:          ctx,
-       }
-       return scheduler, nil
+       s.pool = pool
+       return s, nil
 }
 
-func (s *WorkerScheduler) Submit(task func() error, pool ...*ants.Pool) error {
-       select {
-       case <-s.ctx.Done():
-               return s.ctx.Err()
-       default:
-       }
-       s.waitGroup.Add(1)
+// SubmitBlocking enqueues a async task to ants, the task will be executed in 
future when timing is right.
+// It doesn't return error because it wouldn't be any when with a Blocking 
semantic, returned error does nothing but
+// causing confusion, more often, people thought it is returned by the task.
+// Since it is async task, the callframes would not be available for 
production mode, you can export Environment
+// Varaible ASYNC_CF=true to enable callframes capturing when debugging.
+// IMPORTANT: do NOT call SubmitBlocking inside the async task, it is likely 
to cause a deadlock, call
+// SubmitNonBlocking instead when number of tasks is relatively small.
+func (s *WorkerScheduler) SubmitBlocking(task func() error) {
        // this is expensive, enable by EnvVar
-       cf := "set Environment Varaible ASYNC_CF=true to enable callframes 
information"
-       if os.Getenv("ASYNC_CF") == "true" {
-               cf = utils.GatherCallFrames()
-       }
-       var currentPool *ants.Pool
-       if pool == nil {
-               currentPool = s.pool
-       } else {
-               currentPool = pool[0]
+       cf := s.gatherCallFrames()
+       // to make sure task is done
+       if len(s.workerErrors) > 0 {
+               // not point to continue
+               return
        }
+       s.waitGroup.Add(1)
+       err := s.pool.Submit(func() {
+               defer s.waitGroup.Done()
+
+               id := atomic.AddInt32(&s.counter, 1)
+               s.logger.Debug("schedulerJob >>> %d started", id)
+               defer s.logger.Debug("schedulerJob <<< %d ended", id)
 
-       return currentPool.Submit(func() {
                var err error
-               defer s.waitGroup.Done()
-               defer func() {
-                       if err == nil {
-                               r := recover()
-                               if r != nil {
-                                       err = fmt.Errorf("%s", r)
-                               }
-                       }
-                       if err != nil {
-                               panic(fmt.Errorf("%s\n%s", err, cf))
-                       }
-               }()
-               if pool == nil && s.ticker != nil {
-                       for s.subPool.Running() != 0 {
-                               <-s.ticker.C
-                       }
-               }
-               if s.ticker != nil {
-                       <-s.ticker.C
+               defer s.gatherError(err, cf)
+
+               if len(s.workerErrors) > 0 {
+                       // not point to continue
+                       return
                }
+               // wait for rate limit throttling
                select {
                case <-s.ctx.Done():
                        err = s.ctx.Err()
-               default:
+               case <-s.ticker.C:
                        err = task()
                }
        })
+       // failed to submit, note that this is not task erro
+       if err != nil {
+               s.gatherError(err, cf)
+       }
+}
+
+func (s *WorkerScheduler) gatherCallFrames() string {
+       cf := "set Environment Varaible ASYNC_CF=true to enable callframes 
capturing"
+       if callframeEnabled {
+               cf = utils.GatherCallFrames(1)
+       }
+       return cf
+}
+
+func (s *WorkerScheduler) gatherError(err error, callframs string) {
+       if err == nil {
+               r := recover()
+               if r != nil {
+                       err = fmt.Errorf("%s\n%s", r, callframs)
+               }
+       }
+       if err != nil {
+               s.mu.Lock()
+               defer s.mu.Unlock()
+               s.workerErrors = append(s.workerErrors, err)
+       }
 }
 
-func (s *WorkerScheduler) WaitUntilFinish() error {
+// NextTick enqueues task in a NonBlocking manner, you should only call this 
method within task submitted by
+// SubmitBlocking method
+// IMPORTANT: do NOT call this method with a huge number of tasks, it is 
likely to eat up all available memory
+func (s *WorkerScheduler) NextTick(task func() error) {
+       cf := s.gatherCallFrames()
+       // to make sure task will be enqueued
+       s.waitGroup.Add(1)
+       go func() {
+               var err error
+               defer s.waitGroup.Done()
+               defer s.gatherError(err, cf)
+               err = task()
+       }()
+}
+
+// Wait blocks current go-routine until all workers returned
+func (s *WorkerScheduler) Wait() error {

Review Comment:
   Maybe `WaitUntilFinish` is a more specific name?



##########
plugins/helper/api_collector.go:
##########
@@ -131,144 +117,140 @@ func (collector *ApiCollector) Execute() error {
        logger.Info("start api collection")
 
        // make sure table is created
-       db := collector.args.Ctx.GetDb()
-       err := db.Table(collector.table).AutoMigrate(&RawData{})
+       db := collector.args.Ctx.GetDal()
+       err := db.AutoMigrate(&RawData{}, dal.From(collector.table))
        if err != nil {
                return err
        }
 
        // flush data if not incremental collection
        if !collector.args.Incremental {
-               err = db.Table(collector.table).Delete(&RawData{}, "params = 
?", collector.params).Error
+               err = db.Delete(&RawData{}, dal.From(collector.table), 
dal.Where("params = ?", collector.params))
                if err != nil {
                        return err
                }
        }
 
+       collector.args.Ctx.SetProgress(0, -1)
        if collector.args.Input != nil {
-               collector.args.Ctx.SetProgress(0, -1)
-               // load all rows from iterator, and do multiple `exec` 
accordingly
-               // TODO: this loads all records into memory, we need lazy-load
                iterator := collector.args.Input
+               apiClient := collector.args.ApiClient
                defer iterator.Close()
-               // throttle input process speed so it can be canceled, create a 
channel to represent available slots
-               slots := int(math.Ceil(collector.args.ApiClient.GetQps())) * 2
-               if slots <= 0 {
-                       return fmt.Errorf("RateLimit can't use the 0 Qps")
-               }
-               slotsChan := make(chan bool, slots)
-               defer close(slotsChan)
-               for i := 0; i < slots; i++ {
-                       slotsChan <- true
-               }
-
-               errors := make(chan error, slots)
-               defer close(errors)
-
-               var wg sync.WaitGroup
-               ctx := collector.args.Ctx.GetContext()
-
-       out:
-               for iterator.HasNext() {
-                       select {
-                       // canceled by user, stop
-                       case <-ctx.Done():
-                               err = ctx.Err()
-                               break out
-                       // obtain a slot
-                       case <-slotsChan:
-                               input, err := iterator.Fetch()
-                               if err != nil {
-                                       break out
-                               }
-                               wg.Add(1)
-                               go func() {
-                                       defer func() {
-                                               wg.Done()
-                                               recover() //nolint TODO: check 
the return and do log if not nil
-                                       }()
-                                       e := collector.exec(input)
-                                       // propagate error
-                                       if e != nil {
-                                               errors <- e
-                                       } else {
-                                               // release 1 slot
-                                               slotsChan <- true
-                                       }
-                               }()
-                       case err = <-errors:
-                               break out
+               for iterator.HasNext() && !apiClient.HasError() {
+                       input, err := iterator.Fetch()
+                       if err != nil {
+                               break
                        }
-               }
-               if err == nil {
-                       wg.Wait()
+                       collector.exec(input)
                }
        } else {
                // or we just did it once
-               err = collector.exec(nil)
+               collector.exec(nil)
        }
 
        if err != nil {
                return err
        }
        logger.Debug("wait for all async api to finished")
        err = collector.args.ApiClient.WaitAsync()
-       logger.Info("end api collection")
+       logger.Info("end api collection error: %w", err)
        return err
 }
 
-func (collector *ApiCollector) exec(input interface{}) error {
+func (collector *ApiCollector) exec(input interface{}) {
+       inputJson, err := json.Marshal(input)
+       if err != nil {
+               panic(err)
+       }
        reqData := new(RequestData)
        reqData.Input = input
-       if collector.args.PageSize <= 0 {
-               // collect detail of a record
-               return collector.fetchAsync(reqData, 
collector.handleResponse(reqData))
+       reqData.InputJSON = inputJson
+       reqData.Pager = &Pager{
+               Page: 1,
+               Size: collector.args.PageSize,
        }
-       // collect multiple pages
-       var err error
-       if collector.args.GetTotalPages != nil {
-               /* when total pages is available from api*/
-               // fetch the very first page
-               err = collector.fetchAsync(reqData, 
collector.handleResponseWithPages(reqData))
+       if collector.args.PageSize <= 0 {
+               collector.fetchAsync(reqData, nil)
+       } else if collector.args.GetTotalPages != nil {
+               collector.fetchPagesDetermined(reqData)
        } else {
-               // if api doesn't return total number of pages, employ a step 
concurrent technique
-               // when `Concurrency` was set to 3:
-               // goroutine #1 fetches pages 1/4/7..
-               // goroutine #2 fetches pages 2/5/8...
-               // goroutine #3 fetches pages 3/6/9...
-               errs := make(chan error, collector.args.Concurrency)
-               var errCount int
-               // cancel can only be called when error occurs, because we are 
doomed anyway.
-               ctx, cancel := 
context.WithCancel(collector.args.Ctx.GetContext())
-               defer cancel()
-               for i := 0; i < collector.args.Concurrency; i++ {
-                       reqDataTemp := RequestData{
-                               Pager: &Pager{
-                                       Page: i + 1,
-                                       Size: collector.args.PageSize,
-                                       Skip: collector.args.PageSize * (i),
-                               },
-                               Input: reqData.Input,
-                       }
-                       go func() {
-                               errs <- collector.stepFetch(ctx, cancel, 
reqDataTemp)
-                       }()
+               collector.fetchPagesUndetermined(reqData)
+       }
+}
+
+// fetchPagesDetermined fetches data of all pages for APIs that return paging 
information
+func (collector *ApiCollector) fetchPagesDetermined(reqData *RequestData) {
+       // fetch first page
+       collector.fetchAsync(reqData, func(count int, body []byte, res 
*http.Response) error {
+               totalPages, err := collector.args.GetTotalPages(res, 
collector.args)
+               if err != nil {
+                       return fmt.Errorf("fetchPagesDetermined get totalPages 
faileds: %s", err.Error())
                }
-               for e := range errs {
-                       errCount++
-                       if err != nil || errCount == collector.args.Concurrency 
{
-                               err = e
-                               break
+               // spawn a none blocking go routine to fetch other pages
+               collector.args.ApiClient.NextTick(func() error {
+                       for page := 2; page <= totalPages; page++ {
+                               reqDataTemp := &RequestData{
+                                       Pager: &Pager{
+                                               Page: page,
+                                               Skip: collector.args.PageSize * 
(page - 1),
+                                               Size: collector.args.PageSize,
+                                       },
+                                       Input: reqData.Input,
+                               }
+                               collector.fetchAsync(reqDataTemp, nil)
                        }
+                       return nil
+               })
+               return nil
+       })
+}
+
+// fetchPagesUndetermined fetches data of all pages for APIs that do NOT 
return paging information
+func (collector *ApiCollector) fetchPagesUndetermined(reqData *RequestData) {
+       //logger := collector.args.Ctx.GetLogger()
+       //logger.Debug("fetch all pages in parallel with specified concurrency: 
%d", collector.args.Concurrency)
+       // if api doesn't return total number of pages, employ a step 
concurrent technique
+       // when `Concurrency` was set to 3:
+       // goroutine #1 fetches pages 1/4/7..
+       // goroutine #2 fetches pages 2/5/8...
+       // goroutine #3 fetches pages 3/6/9...
+       apiClient := collector.args.ApiClient
+       concurrency := collector.args.Concurrency
+       if concurrency == 0 {
+               // normally when a multi-pages api depends on a another 
resource, like jira changelogs depend on issue ids
+               // it tend to have less page, like 1 or 2 pages in total
+               if collector.args.Input != nil {
+                       concurrency = 2
+               } else {
+                       concurrency = apiClient.GetNumOfWorkers()
                }
        }
-       if err != nil {
-               return err
-       }
-       if collector.args.Input != nil {
-               collector.args.Ctx.IncProgress(1)
+       for i := 0; i < concurrency; i++ {
+               reqDataCopy := RequestData{
+                       Pager: &Pager{
+                               Page: i + 1,
+                               Size: collector.args.PageSize,
+                               Skip: collector.args.PageSize * (i),
+                       },
+                       Input: reqData.Input,
+               }
+               var collect func() error
+               collect = func() error {
+                       collector.fetchAsync(&reqDataCopy, func(count int, body 
[]byte, res *http.Response) error {
+                               if count < collector.args.PageSize {
+                                       return nil
+                               }
+                               apiClient.NextTick(func() error {
+                                       reqDataCopy.Pager.Skip += 
collector.args.PageSize
+                                       reqDataCopy.Pager.Page += concurrency
+                                       return collect()
+                               })
+                               return nil
+                       })
+                       return nil
+               }
+               apiClient.NextTick(collect)

Review Comment:
   It's not easy to understand ... I dont think we can run some time-consuming 
codes in NextTick.
   Maybe NextTick should not defined in apiClient. We need fetchAsync in main 
function(fetchPagesUndetermined) and just submit task in next tick? Then we can 
just define submitTaskNextTick in apiClient.



##########
Makefile:
##########
@@ -51,9 +51,13 @@ configure-dev:
 commit:
        git cz
 
+mock:
+       rm -rf mocks
+       mockery --all --unroll-variadic=false

Review Comment:
   auto create mockery for all interface ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to