This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e2ad748 Unit test for Total Execute of api_collect. (#1998)
3e2ad748 is described below

commit 3e2ad74827f8d48eecf4350e162aa37c50bee88c
Author: mappjzc <[email protected]>
AuthorDate: Wed May 25 19:07:30 2022 +0800

    Unit test for Total Execute of api_collect. (#1998)
    
    * fix: apicollector stuck problem
    
    * fix: remove redundant code
    
    * refactor: fetch pages in another gorutine
    
    * fix: add page collector into wait group
    
    * test: unit test for api_collector total execute
    
    TestExecute_Total
    
    Nddtfjiang <[email protected]>
    
    * fix: fix unit test for handleResponseWithPages fixing
    
    add mock Add and Done of ApiAsyncClient
    
    Nddtfjiang <[email protected]>
    
    * fix: add timeout setting for testing
    
    Add SetTimeOut
    Add -timeout at makefile
    
    Nddtfjiang <[email protected]>
    
    Co-authored-by: zhangliang <[email protected]>
---
 Makefile                                |   6 +-
 plugins/helper/api_async_client.go      |   8 ++
 plugins/helper/api_async_client_test.go |  19 ++--
 plugins/helper/api_collector.go         |  30 +++---
 plugins/helper/api_collector_test.go    | 172 +++++++++++++++++++++++++++++---
 plugins/helper/worker_scheduler.go      |   8 ++
 6 files changed, 205 insertions(+), 38 deletions(-)

diff --git a/Makefile b/Makefile
index d662033f..561c6170 100644
--- a/Makefile
+++ b/Makefile
@@ -54,13 +54,13 @@ commit:
 test: unit-test e2e-test
 
 unit-test: build
-       set -e; for m in $$(go list ./... | egrep -v 'test|models|e2e'); do 
echo $$m; go test -gcflags=all=-l -v $$m; done
+       set -e; for m in $$(go list ./... | egrep -v 'test|models|e2e'); do 
echo $$m; go test -timeout 60s -gcflags=all=-l -v $$m; done
 
 e2e-test: build
-       PLUGIN_DIR=$(shell readlink -f bin/plugins) go test -v ./test/...
+       PLUGIN_DIR=$(shell readlink -f bin/plugins) go test -timeout 300s -v 
./test/...
 
 e2e-plugins:
-       export ENV_PATH=$(shell readlink -f .env); set -e; for m in $$(go list 
./plugins/... | egrep 'e2e'); do echo $$m; go test -gcflags=all=-l -v $$m; done
+       export ENV_PATH=$(shell readlink -f .env); set -e; for m in $$(go list 
./plugins/... | egrep 'e2e'); do echo $$m; go test -timeout 300s 
-gcflags=all=-l -v $$m; done
 
 real-e2e-test:
        PLUGIN_DIR=$(shell readlink -f bin/plugins) go test -v ./e2e/...
diff --git a/plugins/helper/api_async_client.go 
b/plugins/helper/api_async_client.go
index 5a6bb35c..20d4f894 100644
--- a/plugins/helper/api_async_client.go
+++ b/plugins/helper/api_async_client.go
@@ -185,11 +185,19 @@ func (apiClient *ApiAsyncClient) WaitAsync() error {
 func (apiClient *ApiAsyncClient) GetQps() float64 {
        return apiClient.qps
 }
+func (apiClient *ApiAsyncClient) Add(delta int) {
+       apiClient.scheduler.Add(delta)
+}
+func (apiClient *ApiAsyncClient) Done() {
+       apiClient.scheduler.Done()
+}
 
 type RateLimitedApiClient interface {
        GetAsync(path string, query url.Values, header http.Header, handler 
ApiAsyncCallback) error
        WaitAsync() error
        GetQps() float64
+       Add(delta int)
+       Done()
 }
 
 var _ RateLimitedApiClient = (*ApiAsyncClient)(nil)
diff --git a/plugins/helper/api_async_client_test.go 
b/plugins/helper/api_async_client_test.go
index 169d0ff4..dff9c5f8 100644
--- a/plugins/helper/api_async_client_test.go
+++ b/plugins/helper/api_async_client_test.go
@@ -89,13 +89,7 @@ func GetConfigForTest(basepath string) *viper.Viper {
        return v
 }
 
-// Create an AsyncApiClient object for test
-func CreateTestAsyncApiClient(t *testing.T) (*ApiAsyncClient, error) {
-       // create rate limit calculator
-       rateLimiter := &ApiRateLimitCalculator{
-               UserRateLimitPerHour: 36000, // ten times each seconed
-       }
-
+func CreateTestAsyncApiClientWithRateLimitAndCtx(t *testing.T, rateLimiter 
*ApiRateLimitCalculator, ctx context.Context) (*ApiAsyncClient, error) {
        // set the function of create new default taskcontext for the 
AsyncApiClient
        gm := gomonkey.ApplyFunc(NewDefaultTaskContext, func(
                cfg *viper.Viper,
@@ -111,7 +105,7 @@ func CreateTestAsyncApiClient(t *testing.T) 
(*ApiAsyncClient, error) {
                                cfg:      cfg,
                                logger:   &DefaultLogger{},
                                db:       db,
-                               ctx:      context.Background(),
+                               ctx:      ctx,
                                name:     "Test",
                                data:     nil,
                                progress: progress,
@@ -131,6 +125,15 @@ func CreateTestAsyncApiClient(t *testing.T) 
(*ApiAsyncClient, error) {
        return CreateAsyncApiClient(taskCtx, apiClient, rateLimiter)
 }
 
+// Create an AsyncApiClient object for test
+func CreateTestAsyncApiClient(t *testing.T) (*ApiAsyncClient, error) {
+       // create rate limit calculator
+       rateLimiter := &ApiRateLimitCalculator{
+               UserRateLimitPerHour: 36000, // ten times each seconed
+       }
+       return CreateTestAsyncApiClientWithRateLimitAndCtx(t, rateLimiter, 
context.Background())
+}
+
 // go test -gcflags=all=-l -run ^TestWaitAsync_EmptyWork
 func TestWaitAsync_EmptyWork(t *testing.T) {
        asyncApiClient, _ := CreateTestAsyncApiClient(t)
diff --git a/plugins/helper/api_collector.go b/plugins/helper/api_collector.go
index 679533ca..447b5eb9 100644
--- a/plugins/helper/api_collector.go
+++ b/plugins/helper/api_collector.go
@@ -412,20 +412,24 @@ func (collector *ApiCollector) 
handleResponseWithPages(reqData *RequestData) Api
                        collector.args.Ctx.SetProgress(1, totalPages)
                }
                // fetch other pages in parallel
-               for page := 2; page <= totalPages; page++ {
-                       reqDataTemp := &RequestData{
-                               Pager: &Pager{
-                                       Page: page,
-                                       Size: collector.args.PageSize,
-                                       Skip: collector.args.PageSize * (page - 
1),
-                               },
-                               Input: reqData.Input,
-                       }
-                       e = collector.fetchAsync(reqDataTemp, 
collector.handleResponse(reqDataTemp))
-                       if e != nil {
-                               return e
+               collector.args.ApiClient.Add(1)
+               go func() {
+                       defer func() {
+                               collector.args.ApiClient.Done()
+                               recover()
+                       }()
+                       for page := 2; page <= totalPages; page++ {
+                               reqDataTemp := &RequestData{
+                                       Pager: &Pager{
+                                               Page: page,
+                                               Size: collector.args.PageSize,
+                                               Skip: collector.args.PageSize * 
(page - 1),
+                                       },
+                                       Input: reqData.Input,
+                               }
+                               _ = collector.fetchAsync(reqDataTemp, 
collector.handleResponse(reqDataTemp))
                        }
-               }
+               }()
                return nil
        }
 }
diff --git a/plugins/helper/api_collector_test.go 
b/plugins/helper/api_collector_test.go
index 7d04ada1..532b2293 100644
--- a/plugins/helper/api_collector_test.go
+++ b/plugins/helper/api_collector_test.go
@@ -27,6 +27,7 @@ import (
        "net/http"
        "net/url"
        "reflect"
+       "runtime/debug"
        "sync/atomic"
        "testing"
        "time"
@@ -92,6 +93,7 @@ var TestDataCountNotFull int = 50
 var TestPage int = 110
 var TestSkip int = 100100
 var TestSize int = 116102
+var TestTimeOut time.Duration = time.Duration(10) * time.Second
 
 var Cancel context.CancelFunc
 
@@ -120,6 +122,35 @@ func AssertBaseResponse(t *testing.T, A *http.Response, B 
*http.Response) {
        assert.Equal(t, A.ProtoMinor, B.ProtoMinor)
 }
 
+func SetTimeOut(timeout time.Duration, handleer func()) error {
+       stack := string(debug.Stack())
+       t := time.After(timeout)
+       done := make(chan bool)
+       go func() {
+               defer func() {
+                       if r := recover(); r != nil {
+                               fmt.Printf("%s\r\n", stack)
+                               fmt.Printf("%v\r\n", r)
+                       }
+               }()
+
+               if handleer != nil {
+                       handleer()
+               }
+               done <- true
+       }()
+
+       select {
+       case <-t:
+               return fmt.Errorf("[time:%s]\r\n[Time limit for %f 
seconed]\r\n[stack]\r\n%s\r\n",
+                       time.Now().String(),
+                       float64(timeout)/float64(time.Second),
+                       stack)
+       case <-done:
+               return nil
+       }
+}
+
 func AddBodyData(res *http.Response, count int) {
        data := "["
        for i := 0; i < count; i++ {
@@ -531,6 +562,17 @@ func TestHandleResponseWithPages(t *testing.T) {
        })
        defer gs.Reset()
 
+       NeedWait := int64(0)
+       gad := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "Add", 
func(apiClient *ApiAsyncClient, delta int) {
+               atomic.AddInt64(&NeedWait, int64(delta))
+       })
+       defer gad.Reset()
+
+       gdo := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "Done", 
func(apiClient *ApiAsyncClient) {
+               atomic.AddInt64(&NeedWait, -1)
+       })
+       defer gdo.Reset()
+
        // build request Input
        reqData := new(RequestData)
        reqData.Input = TestTableData
@@ -545,6 +587,12 @@ func TestHandleResponseWithPages(t *testing.T) {
 
        // run testing
        err := handle(res, nil)
+
+       // wait run finished
+       for atomic.LoadInt64(&NeedWait) > 0 {
+               time.Sleep(time.Millisecond)
+       }
+
        assert.Equal(t, err, nil)
        for i := 2; i <= TestTotalPage; i++ {
                assert.True(t, pages[i], i)
@@ -763,9 +811,12 @@ func TestStepFetch_Cancel(t *testing.T) {
                Cancel()
        }()
 
-       err := apiCollector.stepFetch(ctx, cancel, *reqData)
+       err := SetTimeOut(TestTimeOut, func() {
+               err := apiCollector.stepFetch(ctx, cancel, *reqData)
+               assert.Equal(t, err, fmt.Errorf("context canceled"))
+       })
+       assert.Equal(t, err, nil)
 
-       assert.Equal(t, err, fmt.Errorf("context canceled"))
 }
 
 func TestExecWithOutPageSize(t *testing.T) {
@@ -880,13 +931,15 @@ func TestExec_Cancel(t *testing.T) {
                Cancel()
        }()
 
-       // run testing
-       err := apiCollector.exec(TestTableData)
+       err := SetTimeOut(TestTimeOut, func() {
+               // run testing
+               err := apiCollector.exec(TestTableData)
+               assert.Equal(t, err, nil)
+               for i := 2; i <= TestTotalPage; i++ {
+                       assert.True(t, pages[i], i)
+               }
+       })
        assert.Equal(t, err, nil)
-
-       for i := 2; i <= TestTotalPage; i++ {
-               assert.True(t, pages[i], i)
-       }
 }
 
 func TestExecute(t *testing.T) {
@@ -991,11 +1044,102 @@ func TestExecute_Cancel(t *testing.T) {
                Cancel()
        }()
 
-       // run testing
-       err := apiCollector.Execute()
-       assert.Equal(t, err, fmt.Errorf("context canceled"))
+       err := SetTimeOut(TestTimeOut, func() {
+               // run testing
+               err := apiCollector.Execute()
+               assert.Equal(t, err, fmt.Errorf("context canceled"))
 
-       input := apiCollector.args.Input.(*TestIterator)
-       assert.Equal(t, input.hasNextTimes >= input.fetchTimes, true)
-       assert.Equal(t, input.closeTimes > 0, true)
+               input := apiCollector.args.Input.(*TestIterator)
+               assert.Equal(t, input.hasNextTimes >= input.fetchTimes, true)
+               assert.Equal(t, input.closeTimes > 0, true)
+       })
+       assert.Equal(t, err, nil)
+}
+
+func TestExecute_Total(t *testing.T) {
+       MockDB(t)
+       defer UnMockDB()
+       apiCollector, _ := CreateTestApiCollector()
+       // less count for more quick test
+       TestDataCount = 10
+       // ReLimit the workNum to test the block
+       reWorkNum := 1
+
+       apiCollector.args.Input = &TestIterator{
+               data:         *TestTableData,
+               count:        TestDataCount,
+               hasNextTimes: 0,
+               fetchTimes:   0,
+               closeTimes:   0,
+               unlimit:      false,
+       }
+
+       gt.Reset()
+       gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db 
*gorm.DB, name string, args ...interface{}) *gorm.DB {
+               assert.Equal(t, name, "_raw_"+TestTableData.TableName())
+               return db
+       },
+       )
+       defer gw.Reset()
+
+       gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), 
"saveRawData", func(collector *ApiCollector, res *http.Response, input 
interface{}) (int, error) {
+               items, err := collector.args.ResponseParser(res)
+               assert.Equal(t, err, nil)
+               for _, v := range items {
+                       jsondata, err := json.Marshal(v)
+                       assert.Equal(t, err, nil)
+                       assert.Equal(t, string(jsondata), TestRawMessage)
+               }
+               assert.Equal(t, input, TestTableData)
+               AssertBaseResponse(t, res, &TestHttpResponse_Suc)
+               return len(items), nil
+       })
+       defer gs.Reset()
+
+       gin := gomonkey.ApplyMethod(reflect.TypeOf(&DefaultLogger{}), "Info", 
func(_ *DefaultLogger, _ string, _ ...interface{}) {
+       })
+       defer gin.Reset()
+
+       gdo := gomonkey.ApplyMethod(reflect.TypeOf(&ApiClient{}), "Do", func(
+               apiClient *ApiClient,
+               method string,
+               path string,
+               query url.Values,
+               body interface{},
+               headers http.Header,
+       ) (*http.Response, error) {
+               res := TestHttpResponse_Suc
+
+               AddBodyData(&res, TestDataCount)
+               SetUrl(&res, TestUrl)
+
+               return &res, nil
+       })
+       defer gdo.Reset()
+
+       var gse *gomonkey.Patches
+       gse = gomonkey.ApplyFunc(NewWorkerScheduler, func(workerNum int, 
maxWork int, maxWorkDuration time.Duration, ctx context.Context, maxRetry int) 
(*WorkerScheduler, error) {
+               gse.Reset()
+               workerNum = reWorkNum
+               return NewWorkerScheduler(workerNum, maxWork, maxWorkDuration, 
ctx, maxRetry)
+       })
+       defer gse.Reset()
+
+       // create rate limit calculator
+       rateLimiter := &ApiRateLimitCalculator{
+               UserRateLimitPerHour: 360000000, // 100000 times each seconed
+       }
+
+       apiCollector.args.ApiClient, _ = 
CreateTestAsyncApiClientWithRateLimitAndCtx(t, rateLimiter, 
apiCollector.args.Ctx.GetContext())
+
+       err := SetTimeOut(TestTimeOut, func() {
+               err := apiCollector.Execute()
+               assert.Equal(t, err, nil)
+
+               input := apiCollector.args.Input.(*TestIterator)
+               assert.Equal(t, input.fetchTimes, TestDataCount)
+               assert.Equal(t, input.hasNextTimes >= input.fetchTimes, true)
+               assert.Equal(t, input.closeTimes > 0, true)
+       })
+       assert.Equal(t, err, nil)
 }
diff --git a/plugins/helper/worker_scheduler.go 
b/plugins/helper/worker_scheduler.go
index b6b951d3..e42bfe33 100644
--- a/plugins/helper/worker_scheduler.go
+++ b/plugins/helper/worker_scheduler.go
@@ -144,3 +144,11 @@ func (s *WorkerScheduler) Release() {
                s.ticker.Stop()
        }
 }
+
+func (s *WorkerScheduler) Add(delta int) {
+       s.waitGroup.Add(delta)
+}
+
+func (s *WorkerScheduler) Done() {
+       s.waitGroup.Done()
+}

Reply via email to