This is an automated email from the ASF dual-hosted git repository. klesh pushed a commit to branch kw-7852-components-field-length in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 5a6c3f933c828fb0f0d2172802e9a396795212ab Author: Klesh Wong <[email protected]> AuthorDate: Mon Aug 5 22:08:03 2024 +0800 refactor: remove extraction logic from github graphql collector --- backend/helpers/pluginhelper/api/batch_save.go | 2 +- .../helpers/pluginhelper/api/graphql_collector.go | 205 +++++---------------- .../github_graphql/tasks/account_collector.go | 12 +- .../github_graphql/tasks/deployment_collector.go | 10 +- .../github_graphql/tasks/issue_collector.go | 10 +- .../plugins/github_graphql/tasks/job_collector.go | 20 +- .../plugins/github_graphql/tasks/pr_collector.go | 13 +- .../plugins/github_graphql/tasks/pr_extractor.go | 137 +++++++------- .../github_graphql/tasks/release_collector.go | 17 +- 9 files changed, 164 insertions(+), 262 deletions(-) diff --git a/backend/helpers/pluginhelper/api/batch_save.go b/backend/helpers/pluginhelper/api/batch_save.go index 623082536..42a71c18d 100644 --- a/backend/helpers/pluginhelper/api/batch_save.go +++ b/backend/helpers/pluginhelper/api/batch_save.go @@ -50,7 +50,7 @@ type BatchSave struct { // NewBatchSave creates a new BatchSave instance func NewBatchSave(basicRes context.BasicRes, slotType reflect.Type, size int, tableName ...string) (*BatchSave, errors.Error) { if slotType.Kind() != reflect.Ptr { - return nil, errors.Default.New("slotType must be a pointer") + panic(errors.Default.New("slotType must be a pointer")) } db := basicRes.GetDal() primaryKey := db.GetPrimaryKeyFields(slotType) diff --git a/backend/helpers/pluginhelper/api/graphql_collector.go b/backend/helpers/pluginhelper/api/graphql_collector.go index d0adbbabb..f1d74a7cb 100644 --- a/backend/helpers/pluginhelper/api/graphql_collector.go +++ b/backend/helpers/pluginhelper/api/graphql_collector.go @@ -26,7 +26,6 @@ import ( "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" - "github.com/apache/incubator-devlake/core/models/common" plugin "github.com/apache/incubator-devlake/core/plugin" "github.com/merico-dev/graphql" ) @@ -79,8 +78,8 @@ type GraphqlCollectorArgs struct { GetPageInfo func(query interface{}, args *GraphqlCollectorArgs) (*GraphqlQueryPageInfo, error) BatchSize int // one of ResponseParser and ResponseParserEvenWhenDataErrors is required to parse response - ResponseParser func(query interface{}, variables map[string]interface{}) ([]interface{}, error) - ResponseParserWithDataErrors func(query interface{}, variables map[string]interface{}, dataErrors []graphql.DataError) ([]interface{}, error) + ResponseParser func(queryWrapper interface{}) ([]json.RawMessage, errors.Error) + IgnoreQueryErrors bool } // GraphqlCollector help you collect data from Graphql services @@ -88,6 +87,7 @@ type GraphqlCollector struct { *RawDataSubTask args *GraphqlCollectorArgs workerErrors []error + batchSave *BatchSave } // ErrFinishCollect is an error which will finish this collector @@ -105,19 +105,25 @@ func NewGraphqlCollector(args GraphqlCollectorArgs) (*GraphqlCollector, errors.E if args.GraphqlClient == nil { return nil, errors.Default.New("ApiClient is required") } - if args.ResponseParser == nil && args.ResponseParserWithDataErrors == nil { + if args.ResponseParser == nil { return nil, errors.Default.New("one of ResponseParser and ResponseParserWithDataErrors is required") } - apiCollector := &GraphqlCollector{ - RawDataSubTask: rawDataSubTask, - args: &args, - } if args.BatchSize == 0 { - args.BatchSize = 500 + args.BatchSize = 100 } if args.InputStep == 0 { args.InputStep = 1 } + apiCollector := &GraphqlCollector{ + RawDataSubTask: rawDataSubTask, + args: &args, + batchSave: errors.Must1(NewBatchSave( + args.Ctx, + reflect.TypeOf(&RawData{}), + args.BatchSize, + rawDataSubTask.table, + )), + } return apiCollector, nil } @@ -133,21 +139,13 @@ func (collector *GraphqlCollector) Execute() errors.Error { return errors.Default.Wrap(err, "error running auto-migrate") } - divider := NewBatchSaveDivider(collector.args.Ctx, collector.args.BatchSize, collector.table, collector.params) - isIncremental := collector.args.Incremental syncPolicy := collector.args.Ctx.TaskContext().SyncPolicy() if syncPolicy != nil && syncPolicy.FullSync { isIncremental = false } // flush data if not incremental collection - if isIncremental { - // re extract data for new scope config - err = collector.ExtractExistRawData(divider) - if err != nil { - collector.checkError(err) - } - } else { + if !isIncremental { err = db.Delete(&RawData{}, dal.From(collector.table), dal.Where("params = ?", collector.params)) if err != nil { return errors.Default.Wrap(err, "error deleting data from collector") @@ -166,7 +164,7 @@ func (collector *GraphqlCollector) Execute() errors.Error { collector.checkError(err) break } - collector.exec(divider, input) + collector.exec(input) } } else { for !collector.HasError() { @@ -182,12 +180,12 @@ func (collector *GraphqlCollector) Execute() errors.Error { if inputs == nil { break } - collector.exec(divider, inputs) + collector.exec(inputs) } } } else { // or we just did it once - collector.exec(divider, nil) + collector.exec(nil) } logger.Debug("wait for all async api to finished") @@ -202,11 +200,11 @@ func (collector *GraphqlCollector) Execute() errors.Error { logger.Info("ended api collection without error") } - err = divider.Close() + err = collector.batchSave.Close() return err } -func (collector *GraphqlCollector) exec(divider *BatchSaveDivider, input interface{}) { +func (collector *GraphqlCollector) exec(input interface{}) { inputJson, err := json.Marshal(input) if err != nil { collector.checkError(errors.Default.Wrap(err, `input can not be marshal to json`)) @@ -219,14 +217,14 @@ func (collector *GraphqlCollector) exec(divider *BatchSaveDivider, input interfa Size: collector.args.PageSize, } if collector.args.GetPageInfo != nil { - collector.fetchOneByOne(divider, reqData) + collector.fetchOneByOne(reqData) } else { - collector.fetchAsync(divider, reqData, nil) + collector.fetchAsync(reqData, nil) } } // fetchOneByOne fetches data of all pages for APIs that return paging information -func (collector *GraphqlCollector) fetchOneByOne(divider *BatchSaveDivider, reqData *GraphqlRequestData) { +func (collector *GraphqlCollector) fetchOneByOne(reqData *GraphqlRequestData) { // fetch first page var fetchNextPage func(query interface{}) errors.Error fetchNextPage = func(query interface{}) errors.Error { @@ -247,119 +245,16 @@ func (collector *GraphqlCollector) fetchOneByOne(divider *BatchSaveDivider, reqD Input: reqData.Input, InputJSON: reqData.InputJSON, } - collector.fetchAsync(divider, reqDataTemp, fetchNextPage) + collector.fetchAsync(reqDataTemp, fetchNextPage) return nil }, collector.checkError) } return nil } - collector.fetchAsync(divider, reqData, fetchNextPage) -} - -// BatchSaveWithOrigin save the results and fill raw data origin for them -func (collector *GraphqlCollector) BatchSaveWithOrigin(divider *BatchSaveDivider, results []interface{}, row *RawData) errors.Error { - // batch save divider - RAW_DATA_ORIGIN := "RawDataOrigin" - for _, result := range results { - // get the batch operator for the specific type - batch, err := divider.ForType(reflect.TypeOf(result)) - if err != nil { - return err - } - // set raw data origin field - origin := reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN) - if origin.IsValid() && origin.IsZero() { - origin.Set(reflect.ValueOf(common.RawDataOrigin{ - RawDataTable: collector.table, - RawDataId: row.ID, - RawDataParams: row.Params, - })) - } - // records get saved into db when slots were max outed - err = batch.Add(result) - if err != nil { - return errors.Default.Wrap(err, "error adding result to batch") - } - } - return nil -} - -// ExtractExistRawData will extract data from existing data from raw layer if increment -func (collector *GraphqlCollector) ExtractExistRawData(divider *BatchSaveDivider) errors.Error { - // load data from database - db := collector.args.Ctx.GetDal() - logger := collector.args.Ctx.GetLogger() - - clauses := []dal.Clause{ - dal.From(collector.table), - dal.Where("params = ?", collector.params), - dal.Orderby("id ASC"), - } - - count, err := db.Count(clauses...) - if err != nil { - return errors.Default.Wrap(err, "error getting count of clauses") - } - cursor, err := db.Cursor(clauses...) - if err != nil { - return errors.Default.Wrap(err, "error running DB query") - } - logger.Info("get data from %s where params=%s and got %d", collector.table, collector.params, count) - defer cursor.Close() - - // progress - collector.args.Ctx.SetProgress(0, -1) - ctx := collector.args.Ctx.GetContext() - // iterate all rows - for cursor.Next() { - select { - case <-ctx.Done(): - return errors.Convert(ctx.Err()) - default: - } - // get the type of query and variables. For each iteration, the query should be a different object - query, variables, _ := collector.args.BuildQuery(nil) - row := &RawData{} - err = db.Fetch(cursor, row) - if err != nil { - return errors.Default.Wrap(err, "error fetching row") - } - - err = errors.Convert(json.Unmarshal(row.Data, &query)) - if err != nil { - return errors.Default.Wrap(err, `graphql collector unmarshal query failed`) - } - err = errors.Convert(json.Unmarshal(row.Input, &variables)) - if err != nil { - return errors.Default.Wrap(err, `variables in graphql query can not unmarshal from json`) - } - - var results []interface{} - if collector.args.ResponseParserWithDataErrors != nil { - results, err = errors.Convert01(collector.args.ResponseParserWithDataErrors(query, variables, nil)) - } else { - results, err = errors.Convert01(collector.args.ResponseParser(query, variables)) - } - if err != nil { - if errors.Is(err, ErrFinishCollect) { - logger.Info("existing data parser return ErrFinishCollect, but skip. rawId: #%d", row.ID) - } else { - return errors.Default.Wrap(err, "error calling plugin Extract implementation") - } - } - err = collector.BatchSaveWithOrigin(divider, results, row) - if err != nil { - return err - } - - collector.args.Ctx.IncProgress(1) - } - - // save the last batches - return divider.Close() + collector.fetchAsync(reqData, fetchNextPage) } -func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData *GraphqlRequestData, handler func(query interface{}) errors.Error) { +func (collector *GraphqlCollector) fetchAsync(reqData *GraphqlRequestData, handler func(query interface{}) errors.Error) { if reqData.Pager == nil { reqData.Pager = &CursorPager{ SkipCursor: nil, @@ -373,6 +268,7 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData } logger := collector.args.Ctx.GetLogger() + db := collector.args.Ctx.GetDal() dataErrors, err := collector.args.GraphqlClient.Query(query, variables) if err != nil { if err == context.Canceled { @@ -384,7 +280,7 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData return } if len(dataErrors) > 0 { - if collector.args.ResponseParserWithDataErrors == nil { + if !collector.args.IgnoreQueryErrors { for _, dataError := range dataErrors { collector.checkError(errors.Default.Wrap(dataError, `graphql query got error`)) } @@ -394,50 +290,37 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData } defer logger.Debug("fetchAsync >>> done for %v %v", query, variables) - paramsBytes, err := json.Marshal(query) - if err != nil { - collector.checkError(errors.Default.Wrap(err, `graphql collector marshal query failed`)) - return - } - db := collector.args.Ctx.GetDal() queryStr, _ := graphql.ConstructQuery(query, variables) variablesJson, err := json.Marshal(variables) if err != nil { collector.checkError(errors.Default.Wrap(err, `variables in graphql query can not marshal to json`)) return } - row := &RawData{ - Params: collector.params, - Data: paramsBytes, - Url: queryStr, - Input: variablesJson, - } - err = db.Create(row, dal.From(collector.table)) - if err != nil { - collector.checkError(errors.Default.Wrap(err, `not created row table in graphql collector`)) - return - } - var results []interface{} - if collector.args.ResponseParserWithDataErrors != nil { - results, err = collector.args.ResponseParserWithDataErrors(query, variables, dataErrors) - } else { - results, err = collector.args.ResponseParser(query, variables) + results, err := collector.args.ResponseParser(query) + for _, result := range results { + row := &RawData{ + Params: collector.params, + Data: result, + Url: queryStr, + Input: variablesJson, + } + // collector.batchSave.Add(row) + err = db.Create(row, dal.From(collector.table)) + if err != nil { + collector.checkError(errors.Default.Wrap(err, `not created row table in graphql collector`)) + return + } } if err != nil { if errors.Is(err, ErrFinishCollect) { - logger.Info("collector finish by parser, rawId: #%d", row.ID) + logger.Info("collector finish by parser") handler = nil } else { collector.checkError(errors.Default.Wrap(err, `not parsed response in graphql collector`)) return } } - err = collector.BatchSaveWithOrigin(divider, results, row) - if err != nil { - collector.checkError(err) - return - } collector.args.Ctx.IncProgress(1) if handler != nil { diff --git a/backend/plugins/github_graphql/tasks/account_collector.go b/backend/plugins/github_graphql/tasks/account_collector.go index ad16068c4..27d71b65f 100644 --- a/backend/plugins/github_graphql/tasks/account_collector.go +++ b/backend/plugins/github_graphql/tasks/account_collector.go @@ -18,6 +18,7 @@ limitations under the License. package tasks import ( + "encoding/json" "reflect" "github.com/apache/incubator-devlake/core/dal" @@ -118,13 +119,14 @@ func CollectAccount(taskCtx plugin.SubTaskContext) errors.Error { } return query, variables, nil }, - ResponseParserWithDataErrors: func(iQuery interface{}, variables map[string]interface{}, dataErrors []graphql.DataError) ([]interface{}, error) { - for _, dataError := range dataErrors { - // log and ignore - taskCtx.GetLogger().Warn(dataError, `query user get error but ignore`) + ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) { + query := queryWrapper.(*GraphqlQueryAccountWrapper) + for _, rawL := range query.Users { + messages = append(messages, errors.Must1(json.Marshal(rawL))) } - return nil, nil + return }, + IgnoreQueryErrors: true, }) if err != nil { diff --git a/backend/plugins/github_graphql/tasks/deployment_collector.go b/backend/plugins/github_graphql/tasks/deployment_collector.go index 99ddc493f..b28c8deca 100644 --- a/backend/plugins/github_graphql/tasks/deployment_collector.go +++ b/backend/plugins/github_graphql/tasks/deployment_collector.go @@ -18,6 +18,7 @@ limitations under the License. package tasks import ( + "encoding/json" "strings" "time" @@ -129,15 +130,16 @@ func CollectDeployments(taskCtx plugin.SubTaskContext) errors.Error { query := iQuery.(*GraphqlQueryDeploymentWrapper) return query.Repository.Deployments.PageInfo, nil }, - ResponseParser: func(iQuery interface{}, variables map[string]interface{}) ([]interface{}, error) { - query := iQuery.(*GraphqlQueryDeploymentWrapper) + ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) { + query := queryWrapper.(*GraphqlQueryDeploymentWrapper) deployments := query.Repository.Deployments.Deployments for _, rawL := range deployments { if apiCollector.GetSince() != nil && !apiCollector.GetSince().Before(rawL.UpdatedAt) { - return nil, helper.ErrFinishCollect + return messages, helper.ErrFinishCollect } + messages = append(messages, errors.Must1(json.Marshal(rawL))) } - return nil, nil + return }, }) if err != nil { diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go b/backend/plugins/github_graphql/tasks/issue_collector.go index dbf1b1e95..f2d27cc5f 100644 --- a/backend/plugins/github_graphql/tasks/issue_collector.go +++ b/backend/plugins/github_graphql/tasks/issue_collector.go @@ -18,6 +18,7 @@ limitations under the License. package tasks import ( + "encoding/json" "strings" "time" @@ -115,15 +116,16 @@ func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error { query := iQuery.(*GraphqlQueryIssueWrapper) return query.Repository.IssueList.PageInfo, nil }, - ResponseParser: func(iQuery interface{}, variables map[string]interface{}) ([]interface{}, error) { - query := iQuery.(*GraphqlQueryIssueWrapper) + ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) { + query := queryWrapper.(*GraphqlQueryIssueWrapper) issues := query.Repository.IssueList.Issues for _, rawL := range issues { if apiCollector.GetSince() != nil && !apiCollector.GetSince().Before(rawL.UpdatedAt) { - return nil, helper.ErrFinishCollect + return messages, helper.ErrFinishCollect } + messages = append(messages, errors.Must1(json.Marshal(rawL))) } - return nil, nil + return }, }) if err != nil { diff --git a/backend/plugins/github_graphql/tasks/job_collector.go b/backend/plugins/github_graphql/tasks/job_collector.go index effac75ff..10694e56a 100644 --- a/backend/plugins/github_graphql/tasks/job_collector.go +++ b/backend/plugins/github_graphql/tasks/job_collector.go @@ -18,6 +18,7 @@ limitations under the License. package tasks import ( + "encoding/json" "reflect" "time" @@ -156,13 +157,22 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { } return query, variables, nil }, - ResponseParserWithDataErrors: func(iQuery interface{}, variables map[string]interface{}, dataErrors []graphql.DataError) ([]interface{}, error) { - for _, dataError := range dataErrors { - // log and ignore - taskCtx.GetLogger().Warn(dataError, `query check run get error but ignore`) + ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) { + query := queryWrapper.(*GraphqlQueryCheckRunWrapper) + for _, node := range query.Node { + checkRun := node.CheckSuite.CheckRuns.Nodes[0] + updatedAt := checkRun.StartedAt + if checkRun.CompletedAt != nil { + updatedAt = checkRun.CompletedAt + } + if apiCollector.GetSince() != nil && !apiCollector.GetSince().Before(*updatedAt) { + return messages, helper.ErrFinishCollect + } + messages = append(messages, errors.Must1(json.Marshal(node))) } - return nil, nil + return }, + IgnoreQueryErrors: true, }) if err != nil { return err diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go b/backend/plugins/github_graphql/tasks/pr_collector.go index a129d3b31..9e457e5b9 100644 --- a/backend/plugins/github_graphql/tasks/pr_collector.go +++ b/backend/plugins/github_graphql/tasks/pr_collector.go @@ -18,6 +18,7 @@ limitations under the License. package tasks import ( + "encoding/json" "strings" "time" @@ -170,6 +171,7 @@ func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error { return err } + since := apiCollector.GetSince() err = apiCollector.InitGraphQLCollector(api.GraphqlCollectorArgs{ GraphqlClient: data.GraphqlClient, PageSize: 10, @@ -194,15 +196,16 @@ func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error { query := iQuery.(*GraphqlQueryPrWrapper) return query.Repository.PullRequests.PageInfo, nil }, - ResponseParser: func(iQuery interface{}, variables map[string]interface{}) ([]interface{}, error) { - query := iQuery.(*GraphqlQueryPrWrapper) + ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) { + query := queryWrapper.(*GraphqlQueryPrWrapper) prs := query.Repository.PullRequests.Prs for _, rawL := range prs { - if apiCollector.GetSince() != nil && !apiCollector.GetSince().Before(rawL.CreatedAt) { - return nil, api.ErrFinishCollect + if since != nil && since.After(rawL.UpdatedAt) { + return messages, api.ErrFinishCollect } + messages = append(messages, errors.Must1(json.Marshal(rawL))) } - return nil, nil + return }, }) if err != nil { diff --git a/backend/plugins/github_graphql/tasks/pr_extractor.go b/backend/plugins/github_graphql/tasks/pr_extractor.go index 0b4885328..59bc72cdb 100644 --- a/backend/plugins/github_graphql/tasks/pr_extractor.go +++ b/backend/plugins/github_graphql/tasks/pr_extractor.go @@ -67,88 +67,85 @@ func ExtractPrs(taskCtx plugin.SubTaskContext) errors.Error { Table: RAW_PRS_TABLE, }, Extract: func(row *api.RawData) ([]interface{}, errors.Error) { - apiPr := &GraphqlQueryPrWrapper{} - err := errors.Convert(json.Unmarshal(row.Data, apiPr)) + rawL := &GraphqlQueryPr{} + err := errors.Convert(json.Unmarshal(row.Data, rawL)) if err != nil { return nil, err } - prs := apiPr.Repository.PullRequests.Prs results := make([]interface{}, 0, 1) - for _, rawL := range prs { - githubPr, err := convertGithubPullRequest(rawL, data.Options.ConnectionId, data.Options.GithubId) - if err != nil { - return nil, err + githubPr, err := convertGithubPullRequest(rawL, data.Options.ConnectionId, data.Options.GithubId) + if err != nil { + return nil, err + } + extractGraphqlPreAccount(&results, rawL.Author, data.Options.GithubId, data.Options.ConnectionId) + for _, label := range rawL.Labels.Nodes { + results = append(results, &models.GithubPrLabel{ + ConnectionId: data.Options.ConnectionId, + PullId: githubPr.GithubId, + LabelName: label.Name, + }) + // if pr.Type has not been set and prType is set in .env, process the below + if labelTypeRegex != nil && labelTypeRegex.MatchString(label.Name) { + githubPr.Type = label.Name } - extractGraphqlPreAccount(&results, rawL.Author, data.Options.GithubId, data.Options.ConnectionId) - for _, label := range rawL.Labels.Nodes { - results = append(results, &models.GithubPrLabel{ - ConnectionId: data.Options.ConnectionId, - PullId: githubPr.GithubId, - LabelName: label.Name, - }) - // if pr.Type has not been set and prType is set in .env, process the below - if labelTypeRegex != nil && labelTypeRegex.MatchString(label.Name) { - githubPr.Type = label.Name - } - // if pr.Component has not been set and prComponent is set in .env, process - if labelComponentRegex != nil && labelComponentRegex.MatchString(label.Name) { - githubPr.Component = label.Name + // if pr.Component has not been set and prComponent is set in .env, process + if labelComponentRegex != nil && labelComponentRegex.MatchString(label.Name) { + githubPr.Component = label.Name - } - } - results = append(results, githubPr) - - for _, apiPullRequestReview := range rawL.Reviews.Nodes { - if apiPullRequestReview.State != "PENDING" { - githubPrReview := &models.GithubPrReview{ - ConnectionId: data.Options.ConnectionId, - GithubId: apiPullRequestReview.DatabaseId, - Body: apiPullRequestReview.Body, - State: apiPullRequestReview.State, - CommitSha: apiPullRequestReview.Commit.Oid, - GithubSubmitAt: apiPullRequestReview.SubmittedAt, - - PullRequestId: githubPr.GithubId, - } - - if apiPullRequestReview.Author != nil { - githubPrReview.AuthorUserId = apiPullRequestReview.Author.Id - githubPrReview.AuthorUsername = apiPullRequestReview.Author.Login - extractGraphqlPreAccount(&results, apiPullRequestReview.Author, data.Options.GithubId, data.Options.ConnectionId) - } - - results = append(results, githubPrReview) - } } - for _, apiReviewRequests := range rawL.ReviewRequests.Nodes { - githubReviewRequests := &models.GithubReviewer{ - ConnectionId: data.Options.ConnectionId, + } + results = append(results, githubPr) + + for _, apiPullRequestReview := range rawL.Reviews.Nodes { + if apiPullRequestReview.State != "PENDING" { + githubPrReview := &models.GithubPrReview{ + ConnectionId: data.Options.ConnectionId, + GithubId: apiPullRequestReview.DatabaseId, + Body: apiPullRequestReview.Body, + State: apiPullRequestReview.State, + CommitSha: apiPullRequestReview.Commit.Oid, + GithubSubmitAt: apiPullRequestReview.SubmittedAt, + PullRequestId: githubPr.GithubId, - ReviewerId: apiReviewRequests.RequestedReviewer.User.Id, - Username: apiReviewRequests.RequestedReviewer.User.Login, } - results = append(results, githubReviewRequests) - } - for _, apiPullRequestCommit := range rawL.Commits.Nodes { - githubCommit, err := convertPullRequestCommit(apiPullRequestCommit) - if err != nil { - return nil, err - } - results = append(results, githubCommit) - - githubPullRequestCommit := &models.GithubPrCommit{ - ConnectionId: data.Options.ConnectionId, - CommitSha: apiPullRequestCommit.Commit.Oid, - PullRequestId: githubPr.GithubId, - CommitAuthorName: githubCommit.AuthorName, - CommitAuthorEmail: githubCommit.AuthorEmail, - CommitAuthoredDate: githubCommit.AuthoredDate, + if apiPullRequestReview.Author != nil { + githubPrReview.AuthorUserId = apiPullRequestReview.Author.Id + githubPrReview.AuthorUsername = apiPullRequestReview.Author.Login + extractGraphqlPreAccount(&results, apiPullRequestReview.Author, data.Options.GithubId, data.Options.ConnectionId) } - results = append(results, githubPullRequestCommit) - extractGraphqlPreAccount(&results, apiPullRequestCommit.Commit.Author.User, data.Options.GithubId, data.Options.ConnectionId) + + results = append(results, githubPrReview) + } + } + for _, apiReviewRequests := range rawL.ReviewRequests.Nodes { + githubReviewRequests := &models.GithubReviewer{ + ConnectionId: data.Options.ConnectionId, + PullRequestId: githubPr.GithubId, + ReviewerId: apiReviewRequests.RequestedReviewer.User.Id, + Username: apiReviewRequests.RequestedReviewer.User.Login, + } + results = append(results, githubReviewRequests) + } + + for _, apiPullRequestCommit := range rawL.Commits.Nodes { + githubCommit, err := convertPullRequestCommit(apiPullRequestCommit) + if err != nil { + return nil, err + } + results = append(results, githubCommit) + + githubPullRequestCommit := &models.GithubPrCommit{ + ConnectionId: data.Options.ConnectionId, + CommitSha: apiPullRequestCommit.Commit.Oid, + PullRequestId: githubPr.GithubId, + CommitAuthorName: githubCommit.AuthorName, + CommitAuthorEmail: githubCommit.AuthorEmail, + CommitAuthoredDate: githubCommit.AuthoredDate, } + results = append(results, githubPullRequestCommit) + extractGraphqlPreAccount(&results, apiPullRequestCommit.Commit.Author.User, data.Options.GithubId, data.Options.ConnectionId) } return results, nil }, @@ -161,7 +158,7 @@ func ExtractPrs(taskCtx plugin.SubTaskContext) errors.Error { return extractor.Execute() } -func convertGithubPullRequest(pull GraphqlQueryPr, connId uint64, repoId int) (*models.GithubPullRequest, errors.Error) { +func convertGithubPullRequest(pull *GraphqlQueryPr, connId uint64, repoId int) (*models.GithubPullRequest, errors.Error) { githubPull := &models.GithubPullRequest{ ConnectionId: connId, GithubId: pull.DatabaseId, diff --git a/backend/plugins/github_graphql/tasks/release_collector.go b/backend/plugins/github_graphql/tasks/release_collector.go index 4b71626d0..0090bd60c 100644 --- a/backend/plugins/github_graphql/tasks/release_collector.go +++ b/backend/plugins/github_graphql/tasks/release_collector.go @@ -18,6 +18,7 @@ limitations under the License. package tasks import ( + "encoding/json" "strings" "time" @@ -97,6 +98,7 @@ func CollectRelease(taskCtx plugin.SubTaskContext) errors.Error { return err } + since := apiCollector.GetSince() err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{ GraphqlClient: data.GraphqlClient, PageSize: 100, @@ -119,15 +121,16 @@ func CollectRelease(taskCtx plugin.SubTaskContext) errors.Error { query := iQuery.(*GraphqlQueryReleaseWrapper) return query.Repository.Releases.PageInfo, nil }, - ResponseParser: func(iQuery interface{}, variables map[string]interface{}) ([]interface{}, error) { - query := iQuery.(*GraphqlQueryReleaseWrapper) - deployments := query.Repository.Releases.Releases - for _, rawL := range deployments { - if apiCollector.GetSince() != nil && !apiCollector.GetSince().Before(rawL.UpdatedAt) { - return nil, helper.ErrFinishCollect + ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) { + query := queryWrapper.(*GraphqlQueryReleaseWrapper) + releases := query.Repository.Releases.Releases + for _, rawL := range releases { + if since != nil && since.After(rawL.UpdatedAt) { + return messages, helper.ErrFinishCollect } + messages = append(messages, errors.Must1(json.Marshal(rawL))) } - return nil, nil + return }, }) if err != nil {
