This is an automated email from the ASF dual-hosted git repository. klesh pushed a commit to branch feat-gitlab-tx-inc in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/feat-gitlab-tx-inc by this push: new 5ba00cfce feat: gitlab mr_extractor support incremental sync 5ba00cfce is described below commit 5ba00cfced6f2af25235b5958a35e175fad15563 Author: Klesh Wong <zhenmian.hu...@merico.dev> AuthorDate: Mon Sep 2 21:18:29 2024 +0800 feat: gitlab mr_extractor support incremental sync --- backend/helpers/pluginhelper/api/api_rawdata.go | 2 +- .../pluginhelper/api/subtask_state_manager.go | 2 +- backend/impls/dalgorm/encdec_serializer.go | 5 +++++ backend/plugins/gitlab/tasks/mr_extractor.go | 22 +++++++++++----------- backend/plugins/gitlab/tasks/shared.go | 19 ++++++++++++++++--- 5 files changed, 34 insertions(+), 16 deletions(-) diff --git a/backend/helpers/pluginhelper/api/api_rawdata.go b/backend/helpers/pluginhelper/api/api_rawdata.go index ef16dde6d..5e9bb7835 100644 --- a/backend/helpers/pluginhelper/api/api_rawdata.go +++ b/backend/helpers/pluginhelper/api/api_rawdata.go @@ -34,7 +34,7 @@ type RawData struct { Data []byte Url string Input json.RawMessage `gorm:"type:json"` - CreatedAt time.Time + CreatedAt time.Time `gorm:"index"` } type TaskOptions interface { diff --git a/backend/helpers/pluginhelper/api/subtask_state_manager.go b/backend/helpers/pluginhelper/api/subtask_state_manager.go index 407a291bf..7dd7d2370 100644 --- a/backend/helpers/pluginhelper/api/subtask_state_manager.go +++ b/backend/helpers/pluginhelper/api/subtask_state_manager.go @@ -34,7 +34,7 @@ type SubtaskCommonArgs struct { plugin.SubTaskContext Table string // raw table name Params any // for filtering rows belonging to the scope (jira board, github repo) of the subtask - SubtaskConfig any // for determining whether the subtask should run in incremental or full sync mode + SubtaskConfig any // for determining whether the subtask should run in Incremental or Full-Sync mode by comparing with the previous config to see if it changed BatchSize int // batch size for saving data } diff --git a/backend/impls/dalgorm/encdec_serializer.go b/backend/impls/dalgorm/encdec_serializer.go index bb6589195..fd7a9d880 100644 --- a/backend/impls/dalgorm/encdec_serializer.go +++ b/backend/impls/dalgorm/encdec_serializer.go @@ -91,6 +91,11 @@ func (es *EncDecSerializer) Value(ctx context.Context, field *schema.Field, dst } target = string(b) } + if field.GORMDataType == "string" { + println("field.GORMDataType == string", field.Size) + gormTag, ok := field.Tag.Lookup("gorm") + println(ok, gormTag) + } return plugin.Encrypt(es.encryptionSecret, target) } diff --git a/backend/plugins/gitlab/tasks/mr_extractor.go b/backend/plugins/gitlab/tasks/mr_extractor.go index 76253833b..f25080527 100644 --- a/backend/plugins/gitlab/tasks/mr_extractor.go +++ b/backend/plugins/gitlab/tasks/mr_extractor.go @@ -96,8 +96,8 @@ var ExtractApiMergeRequestsMeta = plugin.SubTaskMeta{ Dependencies: []*plugin.SubTaskMeta{&CollectApiMergeRequestsMeta}, } -func ExtractApiMergeRequests(taskCtx plugin.SubTaskContext) errors.Error { - rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_MERGE_REQUEST_TABLE) +func ExtractApiMergeRequests(subtaskCtx plugin.SubTaskContext) errors.Error { + subtaskCommonArgs, data := CreateSubtaskCommonArgs(subtaskCtx, RAW_MERGE_REQUEST_TABLE) config := data.Options.ScopeConfig var labelTypeRegex *regexp.Regexp var labelComponentRegex *regexp.Regexp @@ -118,10 +118,14 @@ func ExtractApiMergeRequests(taskCtx plugin.SubTaskContext) errors.Error { } } - extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{ - RawDataSubTaskArgs: *rawDataSubTaskArgs, - Extract: func(row *api.RawData) ([]interface{}, errors.Error) { + subtaskCommonArgs.SubtaskConfig = map[string]any{ + "prType": prType, + "prComponent": prComponent, + } + extractor, err := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs{ + SubtaskCommonArgs: subtaskCommonArgs, + Extract: func(row *api.RawData) ([]interface{}, errors.Error) { mr := &MergeRequestRes{} s := string(row.Data) err := errors.Convert(json.Unmarshal(row.Data, mr)) @@ -142,7 +146,7 @@ func ExtractApiMergeRequests(taskCtx plugin.SubTaskContext) errors.Error { } } - results := make([]interface{}, 0, len(mr.Reviewers)+1) + results := make([]interface{}, 0, len(mr.Reviewers)+len(mr.Labels)+1) gitlabMergeRequest.ConnectionId = data.Options.ConnectionId results = append(results, gitlabMergeRequest) for _, label := range mr.Labels { @@ -188,17 +192,13 @@ func ExtractApiMergeRequests(taskCtx plugin.SubTaskContext) errors.Error { } results = append(results, gitlabAssignee) } - return results, nil }, }) - if err != nil { - return errors.Convert(err) + return err } - return extractor.Execute() - } func convertMergeRequest(mr *MergeRequestRes) (*models.GitlabMergeRequest, errors.Error) { diff --git a/backend/plugins/gitlab/tasks/shared.go b/backend/plugins/gitlab/tasks/shared.go index 4051c93b5..976052968 100644 --- a/backend/plugins/gitlab/tasks/shared.go +++ b/backend/plugins/gitlab/tasks/shared.go @@ -155,10 +155,10 @@ func GetQuery(reqData *helper.RequestData) (url.Values, errors.Error) { return query, nil } -func CreateRawDataSubTaskArgs(taskCtx plugin.SubTaskContext, Table string) (*helper.RawDataSubTaskArgs, *GitlabTaskData) { - data := taskCtx.GetData().(*GitlabTaskData) +func CreateRawDataSubTaskArgs(subtaskCtx plugin.SubTaskContext, Table string) (*helper.RawDataSubTaskArgs, *GitlabTaskData) { + data := subtaskCtx.GetData().(*GitlabTaskData) rawDataSubTaskArgs := &helper.RawDataSubTaskArgs{ - Ctx: taskCtx, + Ctx: subtaskCtx, Params: models.GitlabApiParams{ ProjectId: data.Options.ProjectId, ConnectionId: data.Options.ConnectionId, @@ -168,6 +168,19 @@ func CreateRawDataSubTaskArgs(taskCtx plugin.SubTaskContext, Table string) (*hel return rawDataSubTaskArgs, data } +func CreateSubtaskCommonArgs(subtaskCtx plugin.SubTaskContext, table string) (*helper.SubtaskCommonArgs, *GitlabTaskData) { + data := subtaskCtx.GetData().(*GitlabTaskData) + args := &helper.SubtaskCommonArgs{ + SubTaskContext: subtaskCtx, + Table: table, + Params: models.GitlabApiParams{ + ConnectionId: data.Options.ConnectionId, + ProjectId: data.Options.ProjectId, + }, + } + return args, data +} + func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext, apiCollector *helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) { db := taskCtx.GetDal() data := taskCtx.GetData().(*GitlabTaskData)