This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 3e2ad748 Unit test for Total Execute of api_collect. (#1998)
3e2ad748 is described below
commit 3e2ad74827f8d48eecf4350e162aa37c50bee88c
Author: mappjzc <[email protected]>
AuthorDate: Wed May 25 19:07:30 2022 +0800
Unit test for Total Execute of api_collect. (#1998)
* fix: apicollector stuck problem
* fix: remove redundant code
* refactor: fetch pages in another gorutine
* fix: add page collector into wait group
* test: unit test for api_collector total execute
TestExecute_Total
Nddtfjiang <[email protected]>
* fix: fix unit test for handleResponseWithPages fixing
add mock Add and Done of ApiAsyncClient
Nddtfjiang <[email protected]>
* fix: add timeout setting for testing
Add SetTimeOut
Add -timeout at makefile
Nddtfjiang <[email protected]>
Co-authored-by: zhangliang <[email protected]>
---
Makefile | 6 +-
plugins/helper/api_async_client.go | 8 ++
plugins/helper/api_async_client_test.go | 19 ++--
plugins/helper/api_collector.go | 30 +++---
plugins/helper/api_collector_test.go | 172 +++++++++++++++++++++++++++++---
plugins/helper/worker_scheduler.go | 8 ++
6 files changed, 205 insertions(+), 38 deletions(-)
diff --git a/Makefile b/Makefile
index d662033f..561c6170 100644
--- a/Makefile
+++ b/Makefile
@@ -54,13 +54,13 @@ commit:
test: unit-test e2e-test
unit-test: build
- set -e; for m in $$(go list ./... | egrep -v 'test|models|e2e'); do
echo $$m; go test -gcflags=all=-l -v $$m; done
+ set -e; for m in $$(go list ./... | egrep -v 'test|models|e2e'); do
echo $$m; go test -timeout 60s -gcflags=all=-l -v $$m; done
e2e-test: build
- PLUGIN_DIR=$(shell readlink -f bin/plugins) go test -v ./test/...
+ PLUGIN_DIR=$(shell readlink -f bin/plugins) go test -timeout 300s -v
./test/...
e2e-plugins:
- export ENV_PATH=$(shell readlink -f .env); set -e; for m in $$(go list
./plugins/... | egrep 'e2e'); do echo $$m; go test -gcflags=all=-l -v $$m; done
+ export ENV_PATH=$(shell readlink -f .env); set -e; for m in $$(go list
./plugins/... | egrep 'e2e'); do echo $$m; go test -timeout 300s
-gcflags=all=-l -v $$m; done
real-e2e-test:
PLUGIN_DIR=$(shell readlink -f bin/plugins) go test -v ./e2e/...
diff --git a/plugins/helper/api_async_client.go
b/plugins/helper/api_async_client.go
index 5a6bb35c..20d4f894 100644
--- a/plugins/helper/api_async_client.go
+++ b/plugins/helper/api_async_client.go
@@ -185,11 +185,19 @@ func (apiClient *ApiAsyncClient) WaitAsync() error {
func (apiClient *ApiAsyncClient) GetQps() float64 {
return apiClient.qps
}
+func (apiClient *ApiAsyncClient) Add(delta int) {
+ apiClient.scheduler.Add(delta)
+}
+func (apiClient *ApiAsyncClient) Done() {
+ apiClient.scheduler.Done()
+}
type RateLimitedApiClient interface {
GetAsync(path string, query url.Values, header http.Header, handler
ApiAsyncCallback) error
WaitAsync() error
GetQps() float64
+ Add(delta int)
+ Done()
}
var _ RateLimitedApiClient = (*ApiAsyncClient)(nil)
diff --git a/plugins/helper/api_async_client_test.go
b/plugins/helper/api_async_client_test.go
index 169d0ff4..dff9c5f8 100644
--- a/plugins/helper/api_async_client_test.go
+++ b/plugins/helper/api_async_client_test.go
@@ -89,13 +89,7 @@ func GetConfigForTest(basepath string) *viper.Viper {
return v
}
-// Create an AsyncApiClient object for test
-func CreateTestAsyncApiClient(t *testing.T) (*ApiAsyncClient, error) {
- // create rate limit calculator
- rateLimiter := &ApiRateLimitCalculator{
- UserRateLimitPerHour: 36000, // ten times each seconed
- }
-
+func CreateTestAsyncApiClientWithRateLimitAndCtx(t *testing.T, rateLimiter
*ApiRateLimitCalculator, ctx context.Context) (*ApiAsyncClient, error) {
// set the function of create new default taskcontext for the
AsyncApiClient
gm := gomonkey.ApplyFunc(NewDefaultTaskContext, func(
cfg *viper.Viper,
@@ -111,7 +105,7 @@ func CreateTestAsyncApiClient(t *testing.T)
(*ApiAsyncClient, error) {
cfg: cfg,
logger: &DefaultLogger{},
db: db,
- ctx: context.Background(),
+ ctx: ctx,
name: "Test",
data: nil,
progress: progress,
@@ -131,6 +125,15 @@ func CreateTestAsyncApiClient(t *testing.T)
(*ApiAsyncClient, error) {
return CreateAsyncApiClient(taskCtx, apiClient, rateLimiter)
}
+// Create an AsyncApiClient object for test
+func CreateTestAsyncApiClient(t *testing.T) (*ApiAsyncClient, error) {
+ // create rate limit calculator
+ rateLimiter := &ApiRateLimitCalculator{
+ UserRateLimitPerHour: 36000, // ten times each seconed
+ }
+ return CreateTestAsyncApiClientWithRateLimitAndCtx(t, rateLimiter,
context.Background())
+}
+
// go test -gcflags=all=-l -run ^TestWaitAsync_EmptyWork
func TestWaitAsync_EmptyWork(t *testing.T) {
asyncApiClient, _ := CreateTestAsyncApiClient(t)
diff --git a/plugins/helper/api_collector.go b/plugins/helper/api_collector.go
index 679533ca..447b5eb9 100644
--- a/plugins/helper/api_collector.go
+++ b/plugins/helper/api_collector.go
@@ -412,20 +412,24 @@ func (collector *ApiCollector)
handleResponseWithPages(reqData *RequestData) Api
collector.args.Ctx.SetProgress(1, totalPages)
}
// fetch other pages in parallel
- for page := 2; page <= totalPages; page++ {
- reqDataTemp := &RequestData{
- Pager: &Pager{
- Page: page,
- Size: collector.args.PageSize,
- Skip: collector.args.PageSize * (page -
1),
- },
- Input: reqData.Input,
- }
- e = collector.fetchAsync(reqDataTemp,
collector.handleResponse(reqDataTemp))
- if e != nil {
- return e
+ collector.args.ApiClient.Add(1)
+ go func() {
+ defer func() {
+ collector.args.ApiClient.Done()
+ recover()
+ }()
+ for page := 2; page <= totalPages; page++ {
+ reqDataTemp := &RequestData{
+ Pager: &Pager{
+ Page: page,
+ Size: collector.args.PageSize,
+ Skip: collector.args.PageSize *
(page - 1),
+ },
+ Input: reqData.Input,
+ }
+ _ = collector.fetchAsync(reqDataTemp,
collector.handleResponse(reqDataTemp))
}
- }
+ }()
return nil
}
}
diff --git a/plugins/helper/api_collector_test.go
b/plugins/helper/api_collector_test.go
index 7d04ada1..532b2293 100644
--- a/plugins/helper/api_collector_test.go
+++ b/plugins/helper/api_collector_test.go
@@ -27,6 +27,7 @@ import (
"net/http"
"net/url"
"reflect"
+ "runtime/debug"
"sync/atomic"
"testing"
"time"
@@ -92,6 +93,7 @@ var TestDataCountNotFull int = 50
var TestPage int = 110
var TestSkip int = 100100
var TestSize int = 116102
+var TestTimeOut time.Duration = time.Duration(10) * time.Second
var Cancel context.CancelFunc
@@ -120,6 +122,35 @@ func AssertBaseResponse(t *testing.T, A *http.Response, B
*http.Response) {
assert.Equal(t, A.ProtoMinor, B.ProtoMinor)
}
+func SetTimeOut(timeout time.Duration, handleer func()) error {
+ stack := string(debug.Stack())
+ t := time.After(timeout)
+ done := make(chan bool)
+ go func() {
+ defer func() {
+ if r := recover(); r != nil {
+ fmt.Printf("%s\r\n", stack)
+ fmt.Printf("%v\r\n", r)
+ }
+ }()
+
+ if handleer != nil {
+ handleer()
+ }
+ done <- true
+ }()
+
+ select {
+ case <-t:
+ return fmt.Errorf("[time:%s]\r\n[Time limit for %f
seconed]\r\n[stack]\r\n%s\r\n",
+ time.Now().String(),
+ float64(timeout)/float64(time.Second),
+ stack)
+ case <-done:
+ return nil
+ }
+}
+
func AddBodyData(res *http.Response, count int) {
data := "["
for i := 0; i < count; i++ {
@@ -531,6 +562,17 @@ func TestHandleResponseWithPages(t *testing.T) {
})
defer gs.Reset()
+ NeedWait := int64(0)
+ gad := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "Add",
func(apiClient *ApiAsyncClient, delta int) {
+ atomic.AddInt64(&NeedWait, int64(delta))
+ })
+ defer gad.Reset()
+
+ gdo := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "Done",
func(apiClient *ApiAsyncClient) {
+ atomic.AddInt64(&NeedWait, -1)
+ })
+ defer gdo.Reset()
+
// build request Input
reqData := new(RequestData)
reqData.Input = TestTableData
@@ -545,6 +587,12 @@ func TestHandleResponseWithPages(t *testing.T) {
// run testing
err := handle(res, nil)
+
+ // wait run finished
+ for atomic.LoadInt64(&NeedWait) > 0 {
+ time.Sleep(time.Millisecond)
+ }
+
assert.Equal(t, err, nil)
for i := 2; i <= TestTotalPage; i++ {
assert.True(t, pages[i], i)
@@ -763,9 +811,12 @@ func TestStepFetch_Cancel(t *testing.T) {
Cancel()
}()
- err := apiCollector.stepFetch(ctx, cancel, *reqData)
+ err := SetTimeOut(TestTimeOut, func() {
+ err := apiCollector.stepFetch(ctx, cancel, *reqData)
+ assert.Equal(t, err, fmt.Errorf("context canceled"))
+ })
+ assert.Equal(t, err, nil)
- assert.Equal(t, err, fmt.Errorf("context canceled"))
}
func TestExecWithOutPageSize(t *testing.T) {
@@ -880,13 +931,15 @@ func TestExec_Cancel(t *testing.T) {
Cancel()
}()
- // run testing
- err := apiCollector.exec(TestTableData)
+ err := SetTimeOut(TestTimeOut, func() {
+ // run testing
+ err := apiCollector.exec(TestTableData)
+ assert.Equal(t, err, nil)
+ for i := 2; i <= TestTotalPage; i++ {
+ assert.True(t, pages[i], i)
+ }
+ })
assert.Equal(t, err, nil)
-
- for i := 2; i <= TestTotalPage; i++ {
- assert.True(t, pages[i], i)
- }
}
func TestExecute(t *testing.T) {
@@ -991,11 +1044,102 @@ func TestExecute_Cancel(t *testing.T) {
Cancel()
}()
- // run testing
- err := apiCollector.Execute()
- assert.Equal(t, err, fmt.Errorf("context canceled"))
+ err := SetTimeOut(TestTimeOut, func() {
+ // run testing
+ err := apiCollector.Execute()
+ assert.Equal(t, err, fmt.Errorf("context canceled"))
- input := apiCollector.args.Input.(*TestIterator)
- assert.Equal(t, input.hasNextTimes >= input.fetchTimes, true)
- assert.Equal(t, input.closeTimes > 0, true)
+ input := apiCollector.args.Input.(*TestIterator)
+ assert.Equal(t, input.hasNextTimes >= input.fetchTimes, true)
+ assert.Equal(t, input.closeTimes > 0, true)
+ })
+ assert.Equal(t, err, nil)
+}
+
+func TestExecute_Total(t *testing.T) {
+ MockDB(t)
+ defer UnMockDB()
+ apiCollector, _ := CreateTestApiCollector()
+ // less count for more quick test
+ TestDataCount = 10
+ // ReLimit the workNum to test the block
+ reWorkNum := 1
+
+ apiCollector.args.Input = &TestIterator{
+ data: *TestTableData,
+ count: TestDataCount,
+ hasNextTimes: 0,
+ fetchTimes: 0,
+ closeTimes: 0,
+ unlimit: false,
+ }
+
+ gt.Reset()
+ gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db
*gorm.DB, name string, args ...interface{}) *gorm.DB {
+ assert.Equal(t, name, "_raw_"+TestTableData.TableName())
+ return db
+ },
+ )
+ defer gw.Reset()
+
+ gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector),
"saveRawData", func(collector *ApiCollector, res *http.Response, input
interface{}) (int, error) {
+ items, err := collector.args.ResponseParser(res)
+ assert.Equal(t, err, nil)
+ for _, v := range items {
+ jsondata, err := json.Marshal(v)
+ assert.Equal(t, err, nil)
+ assert.Equal(t, string(jsondata), TestRawMessage)
+ }
+ assert.Equal(t, input, TestTableData)
+ AssertBaseResponse(t, res, &TestHttpResponse_Suc)
+ return len(items), nil
+ })
+ defer gs.Reset()
+
+ gin := gomonkey.ApplyMethod(reflect.TypeOf(&DefaultLogger{}), "Info",
func(_ *DefaultLogger, _ string, _ ...interface{}) {
+ })
+ defer gin.Reset()
+
+ gdo := gomonkey.ApplyMethod(reflect.TypeOf(&ApiClient{}), "Do", func(
+ apiClient *ApiClient,
+ method string,
+ path string,
+ query url.Values,
+ body interface{},
+ headers http.Header,
+ ) (*http.Response, error) {
+ res := TestHttpResponse_Suc
+
+ AddBodyData(&res, TestDataCount)
+ SetUrl(&res, TestUrl)
+
+ return &res, nil
+ })
+ defer gdo.Reset()
+
+ var gse *gomonkey.Patches
+ gse = gomonkey.ApplyFunc(NewWorkerScheduler, func(workerNum int,
maxWork int, maxWorkDuration time.Duration, ctx context.Context, maxRetry int)
(*WorkerScheduler, error) {
+ gse.Reset()
+ workerNum = reWorkNum
+ return NewWorkerScheduler(workerNum, maxWork, maxWorkDuration,
ctx, maxRetry)
+ })
+ defer gse.Reset()
+
+ // create rate limit calculator
+ rateLimiter := &ApiRateLimitCalculator{
+ UserRateLimitPerHour: 360000000, // 100000 times each seconed
+ }
+
+ apiCollector.args.ApiClient, _ =
CreateTestAsyncApiClientWithRateLimitAndCtx(t, rateLimiter,
apiCollector.args.Ctx.GetContext())
+
+ err := SetTimeOut(TestTimeOut, func() {
+ err := apiCollector.Execute()
+ assert.Equal(t, err, nil)
+
+ input := apiCollector.args.Input.(*TestIterator)
+ assert.Equal(t, input.fetchTimes, TestDataCount)
+ assert.Equal(t, input.hasNextTimes >= input.fetchTimes, true)
+ assert.Equal(t, input.closeTimes > 0, true)
+ })
+ assert.Equal(t, err, nil)
}
diff --git a/plugins/helper/worker_scheduler.go
b/plugins/helper/worker_scheduler.go
index b6b951d3..e42bfe33 100644
--- a/plugins/helper/worker_scheduler.go
+++ b/plugins/helper/worker_scheduler.go
@@ -144,3 +144,11 @@ func (s *WorkerScheduler) Release() {
s.ticker.Stop()
}
}
+
+func (s *WorkerScheduler) Add(delta int) {
+ s.waitGroup.Add(delta)
+}
+
+func (s *WorkerScheduler) Done() {
+ s.waitGroup.Done()
+}