This is an automated email from the ASF dual-hosted git repository. klesh pushed a commit to branch kw-7852-components-field-length in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit d16f143e1f7d31d15abaf4aea9995ff68bf17e35 Author: Klesh Wong <[email protected]> AuthorDate: Wed Aug 7 17:58:07 2024 +0800 fix: github issues not being updated --- .../github_graphql/tasks/issue_collector.go | 86 ++++++++++++++++++++-- 1 file changed, 78 insertions(+), 8 deletions(-) diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go b/backend/plugins/github_graphql/tasks/issue_collector.go index f2d27cc5f..1bdb3d379 100644 --- a/backend/plugins/github_graphql/tasks/issue_collector.go +++ b/backend/plugins/github_graphql/tasks/issue_collector.go @@ -19,12 +19,15 @@ package tasks import ( "encoding/json" + "reflect" "strings" "time" + "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/plugin" - helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api" + "github.com/apache/incubator-devlake/helpers/pluginhelper/api" + "github.com/apache/incubator-devlake/plugins/github/models" githubTasks "github.com/apache/incubator-devlake/plugins/github/tasks" "github.com/merico-dev/graphql" ) @@ -39,11 +42,20 @@ type GraphqlQueryIssueWrapper struct { IssueList struct { TotalCount graphql.Int Issues []GraphqlQueryIssue `graphql:"nodes"` - PageInfo *helper.GraphqlQueryPageInfo + PageInfo *api.GraphqlQueryPageInfo } `graphql:"issues(first: $pageSize, after: $skipCursor, orderBy: {field: UPDATED_AT, direction: DESC})"` } `graphql:"repository(owner: $owner, name: $name)"` } +type GraphqlQueryIssueDetailWrapper struct { + RateLimit struct { + Cost int + } + Repository struct { + Issues []GraphqlQueryIssue `graphql:"issue(number: $number)" graphql-extend:"true"` + } `graphql:"repository(owner: $owner, name: $name)"` +} + type GraphqlQueryIssue struct { DatabaseId int Number int @@ -83,7 +95,7 @@ var _ plugin.SubTaskEntryPoint = CollectIssues func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*githubTasks.GithubTaskData) - apiCollector, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ + apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ Ctx: taskCtx, Params: githubTasks.GithubApiParams{ ConnectionId: data.Options.ConnectionId, @@ -95,10 +107,12 @@ func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error { return err } - err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{ + // collect new issues since the previous run + since := apiCollector.GetSince() + err = apiCollector.InitGraphQLCollector(api.GraphqlCollectorArgs{ GraphqlClient: data.GraphqlClient, PageSize: 10, - BuildQuery: func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) { + BuildQuery: func(reqData *api.GraphqlRequestData) (interface{}, map[string]interface{}, error) { query := &GraphqlQueryIssueWrapper{} if reqData == nil { return query, map[string]interface{}{}, nil @@ -112,7 +126,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error { } return query, variables, nil }, - GetPageInfo: func(iQuery interface{}, args *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) { + GetPageInfo: func(iQuery interface{}, args *api.GraphqlCollectorArgs) (*api.GraphqlQueryPageInfo, error) { query := iQuery.(*GraphqlQueryIssueWrapper) return query.Repository.IssueList.PageInfo, nil }, @@ -120,8 +134,8 @@ func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error { query := queryWrapper.(*GraphqlQueryIssueWrapper) issues := query.Repository.IssueList.Issues for _, rawL := range issues { - if apiCollector.GetSince() != nil && !apiCollector.GetSince().Before(rawL.UpdatedAt) { - return messages, helper.ErrFinishCollect + if since != nil && since.After(rawL.UpdatedAt) { + return messages, api.ErrFinishCollect } messages = append(messages, errors.Must1(json.Marshal(rawL))) } @@ -132,5 +146,61 @@ func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error { return err } + // refetch(refresh) for existing issues in the database that are still OPEN + db := taskCtx.GetDal() + cursor, err := db.Cursor( + dal.From(models.GithubIssue{}.TableName()), + dal.Where("state = ? AND repo_id = ? AND connection_id=?", "OPEN", data.Options.GithubId, data.Options.ConnectionId), + ) + if err != nil { + return err + } + iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.GithubIssue{})) + if err != nil { + return err + } + issueUpdatedAt := make(map[int]time.Time) + err = apiCollector.InitGraphQLCollector(api.GraphqlCollectorArgs{ + GraphqlClient: data.GraphqlClient, + Input: iterator, + InputStep: 100, + Incremental: true, + BuildQuery: func(reqData *api.GraphqlRequestData) (interface{}, map[string]interface{}, error) { + query := &GraphqlQueryIssueDetailWrapper{} + if reqData == nil { + return query, map[string]interface{}{}, nil + } + ownerName := strings.Split(data.Options.Name, "/") + inputIssues := reqData.Input.([]interface{}) + outputIssues := []map[string]interface{}{} + for _, i := range inputIssues { + inputIssue := i.(*models.GithubIssue) + outputIssues = append(outputIssues, map[string]interface{}{ + `number`: graphql.Int(inputIssue.Number), + }) + issueUpdatedAt[inputIssue.Number] = inputIssue.GithubUpdatedAt + } + variables := map[string]interface{}{ + "issue": outputIssues, + "owner": graphql.String(ownerName[0]), + "name": graphql.String(ownerName[1]), + } + return query, variables, nil + }, + ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) { + query := queryWrapper.(*GraphqlQueryIssueDetailWrapper) + issues := query.Repository.Issues + for _, rawL := range issues { + if rawL.UpdatedAt.After(issueUpdatedAt[rawL.Number]) { + messages = append(messages, errors.Must1(json.Marshal(rawL))) + } + } + return + }, + }) + if err != nil { + return err + } + return apiCollector.Execute() }
