klesh commented on code in PR #2053:
URL: https://github.com/apache/incubator-devlake/pull/2053#discussion_r891096222
##########
plugins/helper/api_collector.go:
##########
@@ -131,144 +117,140 @@ func (collector *ApiCollector) Execute() error {
logger.Info("start api collection")
// make sure table is created
- db := collector.args.Ctx.GetDb()
- err := db.Table(collector.table).AutoMigrate(&RawData{})
+ db := collector.args.Ctx.GetDal()
+ err := db.AutoMigrate(&RawData{}, dal.From(collector.table))
if err != nil {
return err
}
// flush data if not incremental collection
if !collector.args.Incremental {
- err = db.Table(collector.table).Delete(&RawData{}, "params =
?", collector.params).Error
+ err = db.Delete(&RawData{}, dal.From(collector.table),
dal.Where("params = ?", collector.params))
if err != nil {
return err
}
}
+ collector.args.Ctx.SetProgress(0, -1)
if collector.args.Input != nil {
- collector.args.Ctx.SetProgress(0, -1)
- // load all rows from iterator, and do multiple `exec`
accordingly
- // TODO: this loads all records into memory, we need lazy-load
iterator := collector.args.Input
+ apiClient := collector.args.ApiClient
defer iterator.Close()
- // throttle input process speed so it can be canceled, create a
channel to represent available slots
- slots := int(math.Ceil(collector.args.ApiClient.GetQps())) * 2
- if slots <= 0 {
- return fmt.Errorf("RateLimit can't use the 0 Qps")
- }
- slotsChan := make(chan bool, slots)
- defer close(slotsChan)
- for i := 0; i < slots; i++ {
- slotsChan <- true
- }
-
- errors := make(chan error, slots)
- defer close(errors)
-
- var wg sync.WaitGroup
- ctx := collector.args.Ctx.GetContext()
-
- out:
- for iterator.HasNext() {
- select {
- // canceled by user, stop
- case <-ctx.Done():
- err = ctx.Err()
- break out
- // obtain a slot
- case <-slotsChan:
- input, err := iterator.Fetch()
- if err != nil {
- break out
- }
- wg.Add(1)
- go func() {
- defer func() {
- wg.Done()
- recover() //nolint TODO: check
the return and do log if not nil
- }()
- e := collector.exec(input)
- // propagate error
- if e != nil {
- errors <- e
- } else {
- // release 1 slot
- slotsChan <- true
- }
- }()
- case err = <-errors:
- break out
+ for iterator.HasNext() && !apiClient.HasError() {
+ input, err := iterator.Fetch()
+ if err != nil {
+ break
}
- }
- if err == nil {
- wg.Wait()
+ collector.exec(input)
}
} else {
// or we just did it once
- err = collector.exec(nil)
+ collector.exec(nil)
}
if err != nil {
return err
}
logger.Debug("wait for all async api to finished")
err = collector.args.ApiClient.WaitAsync()
- logger.Info("end api collection")
+ logger.Info("end api collection error: %w", err)
return err
}
-func (collector *ApiCollector) exec(input interface{}) error {
+func (collector *ApiCollector) exec(input interface{}) {
+ inputJson, err := json.Marshal(input)
+ if err != nil {
+ panic(err)
+ }
reqData := new(RequestData)
reqData.Input = input
- if collector.args.PageSize <= 0 {
- // collect detail of a record
- return collector.fetchAsync(reqData,
collector.handleResponse(reqData))
+ reqData.InputJSON = inputJson
+ reqData.Pager = &Pager{
+ Page: 1,
+ Size: collector.args.PageSize,
}
- // collect multiple pages
- var err error
- if collector.args.GetTotalPages != nil {
- /* when total pages is available from api*/
- // fetch the very first page
- err = collector.fetchAsync(reqData,
collector.handleResponseWithPages(reqData))
+ if collector.args.PageSize <= 0 {
+ collector.fetchAsync(reqData, nil)
+ } else if collector.args.GetTotalPages != nil {
+ collector.fetchPagesDetermined(reqData)
} else {
- // if api doesn't return total number of pages, employ a step
concurrent technique
- // when `Concurrency` was set to 3:
- // goroutine #1 fetches pages 1/4/7..
- // goroutine #2 fetches pages 2/5/8...
- // goroutine #3 fetches pages 3/6/9...
- errs := make(chan error, collector.args.Concurrency)
- var errCount int
- // cancel can only be called when error occurs, because we are
doomed anyway.
- ctx, cancel :=
context.WithCancel(collector.args.Ctx.GetContext())
- defer cancel()
- for i := 0; i < collector.args.Concurrency; i++ {
- reqDataTemp := RequestData{
- Pager: &Pager{
- Page: i + 1,
- Size: collector.args.PageSize,
- Skip: collector.args.PageSize * (i),
- },
- Input: reqData.Input,
- }
- go func() {
- errs <- collector.stepFetch(ctx, cancel,
reqDataTemp)
- }()
+ collector.fetchPagesUndetermined(reqData)
+ }
+}
+
+// fetchPagesDetermined fetches data of all pages for APIs that return paging
information
+func (collector *ApiCollector) fetchPagesDetermined(reqData *RequestData) {
+ // fetch first page
+ collector.fetchAsync(reqData, func(count int, body []byte, res
*http.Response) error {
+ totalPages, err := collector.args.GetTotalPages(res,
collector.args)
+ if err != nil {
+ return fmt.Errorf("fetchPagesDetermined get totalPages
faileds: %s", err.Error())
}
- for e := range errs {
- errCount++
- if err != nil || errCount == collector.args.Concurrency
{
- err = e
- break
+ // spawn a none blocking go routine to fetch other pages
+ collector.args.ApiClient.NextTick(func() error {
+ for page := 2; page <= totalPages; page++ {
+ reqDataTemp := &RequestData{
+ Pager: &Pager{
+ Page: page,
+ Skip: collector.args.PageSize *
(page - 1),
+ Size: collector.args.PageSize,
+ },
+ Input: reqData.Input,
+ }
+ collector.fetchAsync(reqDataTemp, nil)
}
+ return nil
+ })
+ return nil
+ })
+}
+
+// fetchPagesUndetermined fetches data of all pages for APIs that do NOT
return paging information
+func (collector *ApiCollector) fetchPagesUndetermined(reqData *RequestData) {
+ //logger := collector.args.Ctx.GetLogger()
+ //logger.Debug("fetch all pages in parallel with specified concurrency:
%d", collector.args.Concurrency)
+ // if api doesn't return total number of pages, employ a step
concurrent technique
+ // when `Concurrency` was set to 3:
+ // goroutine #1 fetches pages 1/4/7..
+ // goroutine #2 fetches pages 2/5/8...
+ // goroutine #3 fetches pages 3/6/9...
+ apiClient := collector.args.ApiClient
+ concurrency := collector.args.Concurrency
+ if concurrency == 0 {
+ // normally when a multi-pages api depends on a another
resource, like jira changelogs depend on issue ids
+ // it tend to have less page, like 1 or 2 pages in total
+ if collector.args.Input != nil {
+ concurrency = 2
+ } else {
+ concurrency = apiClient.GetNumOfWorkers()
}
}
- if err != nil {
- return err
- }
- if collector.args.Input != nil {
- collector.args.Ctx.IncProgress(1)
+ for i := 0; i < concurrency; i++ {
+ reqDataCopy := RequestData{
+ Pager: &Pager{
+ Page: i + 1,
+ Size: collector.args.PageSize,
+ Skip: collector.args.PageSize * (i),
+ },
+ Input: reqData.Input,
+ }
+ var collect func() error
+ collect = func() error {
+ collector.fetchAsync(&reqDataCopy, func(count int, body
[]byte, res *http.Response) error {
+ if count < collector.args.PageSize {
+ return nil
+ }
+ apiClient.NextTick(func() error {
+ reqDataCopy.Pager.Skip +=
collector.args.PageSize
+ reqDataCopy.Pager.Page += concurrency
+ return collect()
+ })
+ return nil
+ })
+ return nil
+ }
+ apiClient.NextTick(collect)
Review Comment:
Yes, we can, at least for what we have inside this the `callback` function.
It is time-consuming in terms of `Blocking`, so it is safe.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]