klesh commented on code in PR #2053:
URL: https://github.com/apache/incubator-devlake/pull/2053#discussion_r891415991


##########
plugins/helper/worker_scheduler.go:
##########
@@ -22,133 +22,155 @@ import (
        "fmt"
        "os"
        "sync"
+       "sync/atomic"
        "time"
 
+       "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/utils"
        "github.com/panjf2000/ants/v2"
 )
 
+// WorkerScheduler runs asynchronous tasks in parallel with throttling support
 type WorkerScheduler struct {
-       waitGroup    *sync.WaitGroup
+       waitGroup    sync.WaitGroup
        pool         *ants.Pool
-       subPool      *ants.Pool
        ticker       *time.Ticker
-       workerErrors *[]error
+       workerErrors []error
        ctx          context.Context
+       mu           sync.Mutex
+       counter      int32
+       logger       core.Logger
 }
 
-// NewWorkerScheduler 创建一个并行执行的调度器,控制最大运行数和每秒最大运行数量
-// NewWorkerScheduler Create a parallel scheduler to control the maximum 
number of runs and the maximum number of runs per second
-// 注意: task执行是无序的
-// Warning: task execution is out of order
-func NewWorkerScheduler(workerNum int, maxWork int, maxWorkDuration 
time.Duration, ctx context.Context, maxRetry int) (*WorkerScheduler, error) {
-       var waitGroup sync.WaitGroup
-       workerErrors := make([]error, 0)
-       pWorkerErrors := &workerErrors
-       var mux sync.Mutex
-       pool, err := ants.NewPool(workerNum, ants.WithPanicHandler(func(i 
interface{}) {
-               mux.Lock()
-               defer mux.Unlock()
-               workerErrors = append(*pWorkerErrors, i.(error))
-               pWorkerErrors = &workerErrors
-       }))
-       if err != nil {
-               return nil, err
+var callframeEnabled = os.Getenv("ASYNC_CF") == "true"
+
+// NewWorkerScheduler creates a WorkerScheduler
+func NewWorkerScheduler(
+       workerNum int,
+       maxWork int,
+       maxWorkDuration time.Duration,
+       ctx context.Context,
+       maxRetry int,
+       logger core.Logger,
+) (*WorkerScheduler, error) {
+       if maxWork <= 0 {
+               return nil, fmt.Errorf("maxWork less than 1")
        }
-       subPool, err := ants.NewPool(workerNum*maxRetry, 
ants.WithPanicHandler(func(i interface{}) {
-               mux.Lock()
-               defer mux.Unlock()
-               workerErrors = append(*pWorkerErrors, i.(error))
-               pWorkerErrors = &workerErrors
+       if maxWorkDuration <= 0 {
+               return nil, fmt.Errorf("maxWorkDuration less than 1")
+       }
+       s := &WorkerScheduler{
+               ctx:    ctx,
+               ticker: time.NewTicker(maxWorkDuration / 
time.Duration(maxWork)),
+               logger: logger,
+       }
+       pool, err := ants.NewPool(workerNum, ants.WithPanicHandler(func(i 
interface{}) {
+               s.mu.Lock()
+               defer s.mu.Unlock()
+               s.workerErrors = append(s.workerErrors, i.(error))
        }))
        if err != nil {
                return nil, err
        }
-       var ticker *time.Ticker
-       if maxWork > 0 {
-               ticker = time.NewTicker(maxWorkDuration / 
time.Duration(maxWork))
-       }
-       scheduler := &WorkerScheduler{
-               waitGroup:    &waitGroup,
-               pool:         pool,
-               subPool:      subPool,
-               ticker:       ticker,
-               workerErrors: pWorkerErrors,
-               ctx:          ctx,
-       }
-       return scheduler, nil
+       s.pool = pool
+       return s, nil
 }
 
-func (s *WorkerScheduler) Submit(task func() error, pool ...*ants.Pool) error {
-       select {
-       case <-s.ctx.Done():
-               return s.ctx.Err()
-       default:
-       }
-       s.waitGroup.Add(1)
+// SubmitBlocking enqueues a async task to ants, the task will be executed in 
future when timing is right.
+// It doesn't return error because it wouldn't be any when with a Blocking 
semantic, returned error does nothing but
+// causing confusion, more often, people thought it is returned by the task.
+// Since it is async task, the callframes would not be available for 
production mode, you can export Environment
+// Varaible ASYNC_CF=true to enable callframes capturing when debugging.
+// IMPORTANT: do NOT call SubmitBlocking inside the async task, it is likely 
to cause a deadlock, call
+// SubmitNonBlocking instead when number of tasks is relatively small.
+func (s *WorkerScheduler) SubmitBlocking(task func() error) {
        // this is expensive, enable by EnvVar
-       cf := "set Environment Varaible ASYNC_CF=true to enable callframes 
information"
-       if os.Getenv("ASYNC_CF") == "true" {
-               cf = utils.GatherCallFrames()
-       }
-       var currentPool *ants.Pool
-       if pool == nil {
-               currentPool = s.pool
-       } else {
-               currentPool = pool[0]
+       cf := s.gatherCallFrames()
+       // to make sure task is done
+       if len(s.workerErrors) > 0 {
+               // not point to continue
+               return
        }
+       s.waitGroup.Add(1)
+       err := s.pool.Submit(func() {
+               defer s.waitGroup.Done()
+
+               id := atomic.AddInt32(&s.counter, 1)
+               s.logger.Debug("schedulerJob >>> %d started", id)
+               defer s.logger.Debug("schedulerJob <<< %d ended", id)
 
-       return currentPool.Submit(func() {
                var err error
-               defer s.waitGroup.Done()
-               defer func() {
-                       if err == nil {
-                               r := recover()
-                               if r != nil {
-                                       err = fmt.Errorf("%s", r)
-                               }
-                       }
-                       if err != nil {
-                               panic(fmt.Errorf("%s\n%s", err, cf))
-                       }
-               }()
-               if pool == nil && s.ticker != nil {
-                       for s.subPool.Running() != 0 {
-                               <-s.ticker.C
-                       }
-               }
-               if s.ticker != nil {
-                       <-s.ticker.C
+               defer s.gatherError(err, cf)
+
+               if len(s.workerErrors) > 0 {
+                       // not point to continue
+                       return
                }
+               // wait for rate limit throttling
                select {
                case <-s.ctx.Done():
                        err = s.ctx.Err()
-               default:
+               case <-s.ticker.C:
                        err = task()
                }
        })
+       // failed to submit, note that this is not task erro
+       if err != nil {
+               s.gatherError(err, cf)
+       }
+}
+
+func (s *WorkerScheduler) gatherCallFrames() string {
+       cf := "set Environment Varaible ASYNC_CF=true to enable callframes 
capturing"
+       if callframeEnabled {
+               cf = utils.GatherCallFrames(1)
+       }
+       return cf
+}
+
+func (s *WorkerScheduler) gatherError(err error, callframs string) {
+       if err == nil {
+               r := recover()
+               if r != nil {
+                       err = fmt.Errorf("%s\n%s", r, callframs)
+               }
+       }
+       if err != nil {
+               s.mu.Lock()
+               defer s.mu.Unlock()
+               s.workerErrors = append(s.workerErrors, err)
+       }
 }
 
-func (s *WorkerScheduler) WaitUntilFinish() error {
+// NextTick enqueues task in a NonBlocking manner, you should only call this 
method within task submitted by
+// SubmitBlocking method
+// IMPORTANT: do NOT call this method with a huge number of tasks, it is 
likely to eat up all available memory
+func (s *WorkerScheduler) NextTick(task func() error) {
+       cf := s.gatherCallFrames()
+       // to make sure task will be enqueued
+       s.waitGroup.Add(1)
+       go func() {
+               var err error
+               defer s.waitGroup.Done()
+               defer s.gatherError(err, cf)
+               err = task()
+       }()
+}
+
+// Wait blocks current go-routine until all workers returned
+func (s *WorkerScheduler) Wait() error {

Review Comment:
   No, I don't agree, `delay` and `sleep` mean sleep no problem.
   But, wait means wait, it can wait for `sleep` to wake of course, but not 
necessarily.
   Besides, `WaitGroup` uses the term `Wait`.
   I think it is only your personal preference



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to