This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 61005cd1e fix: move extract existing graphql data to general position
(#4921)
61005cd1e is described below
commit 61005cd1e4cf3a5c600bf0e99e3805382c32dc57
Author: Likyh <[email protected]>
AuthorDate: Thu Apr 13 21:17:15 2023 +0800
fix: move extract existing graphql data to general position (#4921)
Co-authored-by: abeizn <[email protected]>
---
backend/helpers/pluginhelper/api/api_collector.go | 2 +-
.../helpers/pluginhelper/api/graphql_collector.go | 22 ++++++++++++----------
.../github_graphql/tasks/account_collector.go | 5 ++++-
.../github_graphql/tasks/issue_collector.go | 5 ++++-
.../plugins/github_graphql/tasks/job_collector.go | 5 ++++-
.../plugins/github_graphql/tasks/pr_collector.go | 6 ++++--
6 files changed, 29 insertions(+), 16 deletions(-)
diff --git a/backend/helpers/pluginhelper/api/api_collector.go
b/backend/helpers/pluginhelper/api/api_collector.go
index 27b6b8c5c..845658bef 100644
--- a/backend/helpers/pluginhelper/api/api_collector.go
+++ b/backend/helpers/pluginhelper/api/api_collector.go
@@ -425,7 +425,7 @@ func (collector *ApiCollector) fetchAsync(reqData
*RequestData, handler func(int
items, err := collector.args.ResponseParser(res)
if err != nil {
if errors.Is(err, ErrFinishCollect) {
- logger.Info("a fetch stop by parser, reqInput:
#%d", reqData.Params)
+ logger.Info("a fetch stop by parser, reqInput:
#%s", reqData.Params)
handler = nil
} else {
return errors.Default.Wrap(err,
fmt.Sprintf("error parsing response from %s", apiUrl))
diff --git a/backend/helpers/pluginhelper/api/graphql_collector.go
b/backend/helpers/pluginhelper/api/graphql_collector.go
index de364cfd6..67e3eca80 100644
--- a/backend/helpers/pluginhelper/api/graphql_collector.go
+++ b/backend/helpers/pluginhelper/api/graphql_collector.go
@@ -134,16 +134,23 @@ func (collector *GraphqlCollector) Execute() errors.Error
{
if err != nil {
return errors.Default.Wrap(err, "error running auto-migrate")
}
+
+ divider := NewBatchSaveDivider(collector.args.Ctx,
collector.args.BatchSize, collector.table, collector.params)
+
// flush data if not incremental collection
- if !collector.args.Incremental {
+ if collector.args.Incremental {
+ // re extract data for new transformation rules
+ err = collector.ExtractExistRawData(divider)
+ if err != nil {
+ collector.checkError(err)
+ }
+ } else {
err = db.Delete(&RawData{}, dal.From(collector.table),
dal.Where("params = ?", collector.params))
if err != nil {
return errors.Default.Wrap(err, "error deleting data
from collector")
}
}
- divider := NewBatchSaveDivider(collector.args.Ctx,
collector.args.BatchSize, collector.table, collector.params)
-
collector.args.Ctx.SetProgress(0, -1)
if collector.args.Input != nil {
iterator := collector.args.Input
@@ -208,11 +215,6 @@ func (collector *GraphqlCollector) exec(divider
*BatchSaveDivider, input interfa
SkipCursor: nil,
Size: collector.args.PageSize,
}
- err = collector.ExtractExistRawData(divider, reqData)
- if err != nil {
- collector.checkError(err)
- return
- }
if collector.args.GetPageInfo != nil {
collector.fetchOneByOne(divider, reqData)
} else {
@@ -280,7 +282,7 @@ func (collector *GraphqlCollector)
BatchSaveWithOrigin(divider *BatchSaveDivider
}
// ExtractExistRawData will extract data from existing data from raw layer if
increment
-func (collector *GraphqlCollector) ExtractExistRawData(divider
*BatchSaveDivider, reqData *GraphqlRequestData) errors.Error {
+func (collector *GraphqlCollector) ExtractExistRawData(divider
*BatchSaveDivider) errors.Error {
// load data from database
db := collector.args.Ctx.GetDal()
logger := collector.args.Ctx.GetLogger()
@@ -304,7 +306,7 @@ func (collector *GraphqlCollector)
ExtractExistRawData(divider *BatchSaveDivider
row := &RawData{}
// get the type of query and variables
- query, variables, _ := collector.args.BuildQuery(reqData)
+ query, variables, _ := collector.args.BuildQuery(nil)
// prgress
collector.args.Ctx.SetProgress(0, -1)
diff --git a/backend/plugins/github_graphql/tasks/account_collector.go
b/backend/plugins/github_graphql/tasks/account_collector.go
index c7b2924a6..73d627239 100644
--- a/backend/plugins/github_graphql/tasks/account_collector.go
+++ b/backend/plugins/github_graphql/tasks/account_collector.go
@@ -101,8 +101,11 @@ func CollectAccount(taskCtx plugin.SubTaskContext)
errors.Error {
InputStep: 100,
GraphqlClient: data.GraphqlClient,
BuildQuery: func(reqData *helper.GraphqlRequestData)
(interface{}, map[string]interface{}, error) {
- accounts := reqData.Input.([]interface{})
query := &GraphqlQueryAccountWrapper{}
+ if reqData == nil {
+ return query, map[string]interface{}{}, nil
+ }
+ accounts := reqData.Input.([]interface{})
users := []map[string]interface{}{}
for _, iAccount := range accounts {
account := iAccount.(*SimpleAccount)
diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go
b/backend/plugins/github_graphql/tasks/issue_collector.go
index 53aafe66d..87928c421 100644
--- a/backend/plugins/github_graphql/tasks/issue_collector.go
+++ b/backend/plugins/github_graphql/tasks/issue_collector.go
@@ -116,13 +116,16 @@ func CollectIssue(taskCtx plugin.SubTaskContext)
errors.Error {
PageSize: 100,
Incremental: incremental,
BuildQuery: func(reqData *helper.GraphqlRequestData)
(interface{}, map[string]interface{}, error) {
+ query := &GraphqlQueryIssueWrapper{}
+ if reqData == nil {
+ return query, map[string]interface{}{}, nil
+ }
since := helper.DateTime{}
if incremental {
since = helper.DateTime{Time:
*collectorWithState.LatestState.LatestSuccessStart}
} else if collectorWithState.TimeAfter != nil {
since = helper.DateTime{Time:
*collectorWithState.TimeAfter}
}
- query := &GraphqlQueryIssueWrapper{}
ownerName := strings.Split(data.Options.Name, "/")
variables := map[string]interface{}{
"since": since,
diff --git a/backend/plugins/github_graphql/tasks/job_collector.go
b/backend/plugins/github_graphql/tasks/job_collector.go
index 7c992f4d6..27ec6274b 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -144,8 +144,11 @@ func CollectGraphqlJobs(taskCtx plugin.SubTaskContext)
errors.Error {
Incremental: incremental,
GraphqlClient: data.GraphqlClient,
BuildQuery: func(reqData *helper.GraphqlRequestData)
(interface{}, map[string]interface{}, error) {
- workflowRuns := reqData.Input.([]interface{})
query := &GraphqlQueryCheckRunWrapper{}
+ if reqData == nil {
+ return query, map[string]interface{}{}, nil
+ }
+ workflowRuns := reqData.Input.([]interface{})
checkSuiteIds := []map[string]interface{}{}
for _, iWorkflowRuns := range workflowRuns {
workflowRun :=
iWorkflowRuns.(*SimpleWorkflowRun)
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go
b/backend/plugins/github_graphql/tasks/pr_collector.go
index 510bde4e9..20b3fa9e1 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -171,6 +171,9 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
*/
BuildQuery: func(reqData *api.GraphqlRequestData) (interface{},
map[string]interface{}, error) {
query := &GraphqlQueryPrWrapper{}
+ if reqData == nil {
+ return query, map[string]interface{}{}, nil
+ }
ownerName := strings.Split(data.Options.Name, "/")
variables := map[string]interface{}{
"pageSize": graphql.Int(reqData.Pager.Size),
@@ -191,12 +194,11 @@ func CollectPr(taskCtx plugin.SubTaskContext)
errors.Error {
results := make([]interface{}, 0, 1)
isFinish := false
for _, rawL := range prs {
- // collect all data even though in increment
mode because of existing data extracting
+ // collect data even though in increment mode
because of updating existing data
if collectorWithState.TimeAfter != nil &&
!collectorWithState.TimeAfter.Before(rawL.UpdatedAt) {
isFinish = true
break
}
- //If this is a pr, ignore
githubPr, err := convertGithubPullRequest(rawL,
data.Options.ConnectionId, data.Options.GithubId)
if err != nil {
return nil, err