This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch feat-gitlab-tx-inc
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/feat-gitlab-tx-inc by this 
push:
     new 5ba00cfce feat: gitlab mr_extractor support incremental sync
5ba00cfce is described below

commit 5ba00cfced6f2af25235b5958a35e175fad15563
Author: Klesh Wong <zhenmian.hu...@merico.dev>
AuthorDate: Mon Sep 2 21:18:29 2024 +0800

    feat: gitlab mr_extractor support incremental sync
---
 backend/helpers/pluginhelper/api/api_rawdata.go    |  2 +-
 .../pluginhelper/api/subtask_state_manager.go      |  2 +-
 backend/impls/dalgorm/encdec_serializer.go         |  5 +++++
 backend/plugins/gitlab/tasks/mr_extractor.go       | 22 +++++++++++-----------
 backend/plugins/gitlab/tasks/shared.go             | 19 ++++++++++++++++---
 5 files changed, 34 insertions(+), 16 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/api_rawdata.go 
b/backend/helpers/pluginhelper/api/api_rawdata.go
index ef16dde6d..5e9bb7835 100644
--- a/backend/helpers/pluginhelper/api/api_rawdata.go
+++ b/backend/helpers/pluginhelper/api/api_rawdata.go
@@ -34,7 +34,7 @@ type RawData struct {
        Data      []byte
        Url       string
        Input     json.RawMessage `gorm:"type:json"`
-       CreatedAt time.Time
+       CreatedAt time.Time       `gorm:"index"`
 }
 
 type TaskOptions interface {
diff --git a/backend/helpers/pluginhelper/api/subtask_state_manager.go 
b/backend/helpers/pluginhelper/api/subtask_state_manager.go
index 407a291bf..7dd7d2370 100644
--- a/backend/helpers/pluginhelper/api/subtask_state_manager.go
+++ b/backend/helpers/pluginhelper/api/subtask_state_manager.go
@@ -34,7 +34,7 @@ type SubtaskCommonArgs struct {
        plugin.SubTaskContext
        Table         string // raw table name
        Params        any    // for filtering rows belonging to the scope (jira 
board, github repo) of the subtask
-       SubtaskConfig any    // for determining whether the subtask should run 
in incremental or full sync mode
+       SubtaskConfig any    // for determining whether the subtask should run 
in Incremental or Full-Sync mode by comparing with the previous config to see 
if it changed
        BatchSize     int    // batch size for saving data
 }
 
diff --git a/backend/impls/dalgorm/encdec_serializer.go 
b/backend/impls/dalgorm/encdec_serializer.go
index bb6589195..fd7a9d880 100644
--- a/backend/impls/dalgorm/encdec_serializer.go
+++ b/backend/impls/dalgorm/encdec_serializer.go
@@ -91,6 +91,11 @@ func (es *EncDecSerializer) Value(ctx context.Context, field 
*schema.Field, dst
                }
                target = string(b)
        }
+       if field.GORMDataType == "string" {
+               println("field.GORMDataType == string", field.Size)
+               gormTag, ok := field.Tag.Lookup("gorm")
+               println(ok, gormTag)
+       }
        return plugin.Encrypt(es.encryptionSecret, target)
 }
 
diff --git a/backend/plugins/gitlab/tasks/mr_extractor.go 
b/backend/plugins/gitlab/tasks/mr_extractor.go
index 76253833b..f25080527 100644
--- a/backend/plugins/gitlab/tasks/mr_extractor.go
+++ b/backend/plugins/gitlab/tasks/mr_extractor.go
@@ -96,8 +96,8 @@ var ExtractApiMergeRequestsMeta = plugin.SubTaskMeta{
        Dependencies:     []*plugin.SubTaskMeta{&CollectApiMergeRequestsMeta},
 }
 
-func ExtractApiMergeRequests(taskCtx plugin.SubTaskContext) errors.Error {
-       rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_MERGE_REQUEST_TABLE)
+func ExtractApiMergeRequests(subtaskCtx plugin.SubTaskContext) errors.Error {
+       subtaskCommonArgs, data := CreateSubtaskCommonArgs(subtaskCtx, 
RAW_MERGE_REQUEST_TABLE)
        config := data.Options.ScopeConfig
        var labelTypeRegex *regexp.Regexp
        var labelComponentRegex *regexp.Regexp
@@ -118,10 +118,14 @@ func ExtractApiMergeRequests(taskCtx 
plugin.SubTaskContext) errors.Error {
                }
        }
 
-       extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{
-               RawDataSubTaskArgs: *rawDataSubTaskArgs,
-               Extract: func(row *api.RawData) ([]interface{}, errors.Error) {
+       subtaskCommonArgs.SubtaskConfig = map[string]any{
+               "prType":      prType,
+               "prComponent": prComponent,
+       }
 
+       extractor, err := 
api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs{
+               SubtaskCommonArgs: subtaskCommonArgs,
+               Extract: func(row *api.RawData) ([]interface{}, errors.Error) {
                        mr := &MergeRequestRes{}
                        s := string(row.Data)
                        err := errors.Convert(json.Unmarshal(row.Data, mr))
@@ -142,7 +146,7 @@ func ExtractApiMergeRequests(taskCtx plugin.SubTaskContext) 
errors.Error {
                                }
                        }
 
-                       results := make([]interface{}, 0, len(mr.Reviewers)+1)
+                       results := make([]interface{}, 0, 
len(mr.Reviewers)+len(mr.Labels)+1)
                        gitlabMergeRequest.ConnectionId = 
data.Options.ConnectionId
                        results = append(results, gitlabMergeRequest)
                        for _, label := range mr.Labels {
@@ -188,17 +192,13 @@ func ExtractApiMergeRequests(taskCtx 
plugin.SubTaskContext) errors.Error {
                                }
                                results = append(results, gitlabAssignee)
                        }
-
                        return results, nil
                },
        })
-
        if err != nil {
-               return errors.Convert(err)
+               return err
        }
-
        return extractor.Execute()
-
 }
 
 func convertMergeRequest(mr *MergeRequestRes) (*models.GitlabMergeRequest, 
errors.Error) {
diff --git a/backend/plugins/gitlab/tasks/shared.go 
b/backend/plugins/gitlab/tasks/shared.go
index 4051c93b5..976052968 100644
--- a/backend/plugins/gitlab/tasks/shared.go
+++ b/backend/plugins/gitlab/tasks/shared.go
@@ -155,10 +155,10 @@ func GetQuery(reqData *helper.RequestData) (url.Values, 
errors.Error) {
        return query, nil
 }
 
-func CreateRawDataSubTaskArgs(taskCtx plugin.SubTaskContext, Table string) 
(*helper.RawDataSubTaskArgs, *GitlabTaskData) {
-       data := taskCtx.GetData().(*GitlabTaskData)
+func CreateRawDataSubTaskArgs(subtaskCtx plugin.SubTaskContext, Table string) 
(*helper.RawDataSubTaskArgs, *GitlabTaskData) {
+       data := subtaskCtx.GetData().(*GitlabTaskData)
        rawDataSubTaskArgs := &helper.RawDataSubTaskArgs{
-               Ctx: taskCtx,
+               Ctx: subtaskCtx,
                Params: models.GitlabApiParams{
                        ProjectId:    data.Options.ProjectId,
                        ConnectionId: data.Options.ConnectionId,
@@ -168,6 +168,19 @@ func CreateRawDataSubTaskArgs(taskCtx 
plugin.SubTaskContext, Table string) (*hel
        return rawDataSubTaskArgs, data
 }
 
+func CreateSubtaskCommonArgs(subtaskCtx plugin.SubTaskContext, table string) 
(*helper.SubtaskCommonArgs, *GitlabTaskData) {
+       data := subtaskCtx.GetData().(*GitlabTaskData)
+       args := &helper.SubtaskCommonArgs{
+               SubTaskContext: subtaskCtx,
+               Table:          table,
+               Params: models.GitlabApiParams{
+                       ConnectionId: data.Options.ConnectionId,
+                       ProjectId:    data.Options.ProjectId,
+               },
+       }
+       return args, data
+}
+
 func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext, apiCollector 
*helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*GitlabTaskData)

Reply via email to