This is an automated email from the ASF dual-hosted git repository. klesh pushed a commit to branch kw-gitext-diffsync in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 62ba460a97fc6afe8d767a54af767e9af2cf1a61 Author: Klesh Wong <[email protected]> AuthorDate: Mon Apr 15 15:53:17 2024 +0800 refactor: StatefulApiCollector adopts CollectorStateManager --- ...tor_with_state.go => api_collector_stateful.go} | 155 ++++----------------- .../plugins/azuredevops_go/tasks/pr_collector.go | 15 +- backend/plugins/bitbucket/tasks/api_common.go | 24 ++-- .../plugins/bitbucket_server/tasks/api_common.go | 6 +- backend/plugins/github/tasks/cicd_job_collector.go | 10 +- backend/plugins/github/tasks/comment_collector.go | 10 +- backend/plugins/github/tasks/commit_collector.go | 10 +- backend/plugins/github/tasks/issue_collector.go | 10 +- .../plugins/github/tasks/pr_commit_collector.go | 10 +- .../plugins/github/tasks/pr_review_collector.go | 10 +- .../github/tasks/pr_review_comment_collector.go | 10 +- .../github_graphql/tasks/deployment_collector.go | 8 +- .../github_graphql/tasks/issue_collector.go | 8 +- .../plugins/github_graphql/tasks/job_collector.go | 10 +- .../plugins/github_graphql/tasks/pr_collector.go | 8 +- .../plugins/gitlab/tasks/deployment_collector.go | 14 +- backend/plugins/gitlab/tasks/issue_collector.go | 4 +- backend/plugins/gitlab/tasks/mr_collector.go | 10 +- .../plugins/gitlab/tasks/mr_detail_collector.go | 6 +- backend/plugins/gitlab/tasks/pipeline_collector.go | 10 +- .../gitlab/tasks/pipeline_detail_collector.go | 6 +- backend/plugins/gitlab/tasks/shared.go | 8 +- .../plugins/gitlab/tasks/trigger_job_collector.go | 16 +-- backend/plugins/jenkins/tasks/stage_collector.go | 10 +- .../jira/tasks/development_panel_collector.go | 10 +- backend/plugins/jira/tasks/epic_collector.go | 10 +- .../jira/tasks/issue_changelog_collector.go | 10 +- backend/plugins/jira/tasks/issue_collector.go | 10 +- .../plugins/jira/tasks/issue_comment_collector.go | 10 +- backend/plugins/jira/tasks/remotelink_collector.go | 10 +- backend/plugins/jira/tasks/worklog_collector.go | 16 ++- .../plugins/tapd/tasks/bug_changelog_collector.go | 10 +- backend/plugins/tapd/tasks/bug_collector.go | 10 +- backend/plugins/tapd/tasks/bug_commit_collector.go | 10 +- backend/plugins/tapd/tasks/iteration_collector.go | 10 +- backend/plugins/tapd/tasks/story_bug_collector.go | 10 +- .../tapd/tasks/story_changelog_collector.go | 4 +- backend/plugins/tapd/tasks/story_collector.go | 10 +- .../plugins/tapd/tasks/story_commit_collector.go | 10 +- .../plugins/tapd/tasks/task_changelog_collector.go | 10 +- backend/plugins/tapd/tasks/task_collector.go | 6 +- .../plugins/tapd/tasks/task_commit_collector.go | 10 +- backend/plugins/tapd/tasks/worklog_collector.go | 10 +- .../plugins/zentao/tasks/bug_commits_collector.go | 10 +- .../zentao/tasks/story_commits_collector.go | 10 +- .../plugins/zentao/tasks/task_commits_collector.go | 10 +- 46 files changed, 255 insertions(+), 349 deletions(-) diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go b/backend/helpers/pluginhelper/api/api_collector_stateful.go similarity index 67% rename from backend/helpers/pluginhelper/api/api_collector_with_state.go rename to backend/helpers/pluginhelper/api/api_collector_stateful.go index 513c442cb..2c1086bbc 100644 --- a/backend/helpers/pluginhelper/api/api_collector_with_state.go +++ b/backend/helpers/pluginhelper/api/api_collector_stateful.go @@ -21,172 +21,74 @@ import ( "encoding/json" "net/http" "net/url" - "reflect" "time" - "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" - "github.com/apache/incubator-devlake/core/models" "github.com/apache/incubator-devlake/core/plugin" ) -// ApiCollectorStateManager save collector state in framework table -type ApiCollectorStateManager struct { +// StatefulApiCollector runs multiple collectors as a single subtask and maintains the state of the collector +// mainly the time range to collect across multiple collections. It is useful when you need to support timeAfter +// and diff sync for APIs that do not support filtering by the updated date. +type StatefulApiCollector struct { RawDataSubTaskArgs + CollectorStateManager // *ApiCollector // *GraphqlCollector - subtasks []plugin.SubTask - newState models.CollectorLatestState - IsIncremental bool - Since *time.Time - Before *time.Time + nestedCollectors []plugin.SubTask } -type CollectorOptions struct { - TimeAfter string `json:"timeAfter,omitempty" mapstructure:"timeAfter,omitempty"` -} - -// NewStatefulApiCollector create a new ApiCollectorStateManager -func NewStatefulApiCollector(args RawDataSubTaskArgs) (*ApiCollectorStateManager, errors.Error) { - db := args.Ctx.GetDal() +// NewStatefulApiCollector create a new StatefulApiCollector +func NewStatefulApiCollector(args RawDataSubTaskArgs) (*StatefulApiCollector, errors.Error) { syncPolicy := args.Ctx.TaskContext().SyncPolicy() rawDataSubTask, err := NewRawDataSubTask(args) if err != nil { - return nil, errors.Default.Wrap(err, "Couldn't resolve raw subtask args") - } - - // get optionTimeAfter from options - data := args.Ctx.GetData() - value := reflect.ValueOf(data) - if value.Kind() == reflect.Ptr && value.Elem().Kind() == reflect.Struct { - options := value.Elem().FieldByName("Options") - if options.IsValid() && options.Kind() == reflect.Ptr && options.Elem().Kind() == reflect.Struct { - collectorOptions := options.Elem().FieldByName("CollectorOptions") - if collectorOptions.IsValid() && collectorOptions.Kind() == reflect.Struct { - timeAfter := collectorOptions.FieldByName("TimeAfter") - if timeAfter.IsValid() && timeAfter.Kind() == reflect.String && timeAfter.String() != "" { - optionTimeAfter, parseErr := time.Parse(time.RFC3339, timeAfter.String()) - if parseErr != nil { - return nil, errors.Default.Wrap(parseErr, "Failed to parse timeAfter!") - } - if syncPolicy != nil { - syncPolicy.TimeAfter = &optionTimeAfter - } else { - syncPolicy = &models.SyncPolicy{ - TimeAfter: &optionTimeAfter, - } - } - } - } - } + return nil, err } - - // CollectorLatestState retrieves the latest collector state from the database - oldState := models.CollectorLatestState{} - err = db.First(&oldState, dal.Where(`raw_data_table = ? AND raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params)) + stateManager, err := NewCollectorStateManager(args.Ctx, syncPolicy, rawDataSubTask.table, rawDataSubTask.params) if err != nil { - if db.IsErrorNotFound(err) { - oldState = models.CollectorLatestState{ - RawDataTable: rawDataSubTask.table, - RawDataParams: rawDataSubTask.params, - } - } else { - return nil, errors.Default.Wrap(err, "failed to load JiraLatestCollectorMeta") - } - } - // Extract timeAfter and latestSuccessStart from old state - oldTimeAfter := oldState.TimeAfter - oldLatestSuccessStart := oldState.LatestSuccessStart - - // Calculate incremental and since based on syncPolicy and old state - var isIncremental bool - var since *time.Time - - if oldLatestSuccessStart == nil { - // 1. If no oldState.LatestSuccessStart, not incremental and since is syncPolicy.TimeAfter - isIncremental = false - if syncPolicy != nil { - since = syncPolicy.TimeAfter - } - } else if syncPolicy == nil { - // 2. If no syncPolicy, incremental and since is oldState.LatestSuccessStart - isIncremental = true - since = oldLatestSuccessStart - } else if syncPolicy.FullSync { - // 3. If fullSync true, not incremental and since is syncPolicy.TimeAfter - isIncremental = false - since = syncPolicy.TimeAfter - } else if syncPolicy.TimeAfter == nil { - // 4. If no syncPolicy TimeAfter, incremental and since is oldState.LatestSuccessStart - isIncremental = true - since = oldLatestSuccessStart - } else { - // 5. If syncPolicy.TimeAfter not nil - if oldTimeAfter != nil && syncPolicy.TimeAfter.Before(*oldTimeAfter) { - // 4.1 If oldTimeAfter not nil and syncPolicy.TimeAfter before oldTimeAfter, incremental is false and since is syncPolicy.TimeAfter - isIncremental = false - since = syncPolicy.TimeAfter - } else { - // 4.2 If oldTimeAfter nil or syncPolicy.TimeAfter after oldTimeAfter, incremental is true and since is oldState.LatestSuccessStart - isIncremental = true - since = oldLatestSuccessStart - } - } - - currentTime := time.Now() - oldState.LatestSuccessStart = ¤tTime - if syncPolicy != nil { - oldState.TimeAfter = syncPolicy.TimeAfter - if syncPolicy.TimeAfter != nil && oldTimeAfter != nil && (oldTimeAfter).Before(*syncPolicy.TimeAfter) && !syncPolicy.FullSync { - oldState.TimeAfter = oldTimeAfter - } + return nil, err } - - return &ApiCollectorStateManager{ - RawDataSubTaskArgs: args, - newState: oldState, - IsIncremental: isIncremental, - Since: since, - Before: ¤tTime, + return &StatefulApiCollector{ + RawDataSubTaskArgs: args, + CollectorStateManager: *stateManager, }, nil - } -// InitCollector init the embedded collector -func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs) errors.Error { +// InitCollector appends a new collector to the list +func (m *StatefulApiCollector) InitCollector(args ApiCollectorArgs) errors.Error { args.RawDataSubTaskArgs = m.RawDataSubTaskArgs - args.Incremental = args.Incremental || m.IsIncremental + args.Incremental = m.CollectorStateManager.IsIncremental() apiCollector, err := NewApiCollector(args) if err != nil { return err } - m.subtasks = append(m.subtasks, apiCollector) + m.nestedCollectors = append(m.nestedCollectors, apiCollector) return nil } -// InitGraphQLCollector init the embedded collector -func (m *ApiCollectorStateManager) InitGraphQLCollector(args GraphqlCollectorArgs) errors.Error { +// InitGraphQLCollector appends a new GraphQL collector to the list +func (m *StatefulApiCollector) InitGraphQLCollector(args GraphqlCollectorArgs) errors.Error { args.RawDataSubTaskArgs = m.RawDataSubTaskArgs - args.Incremental = args.Incremental || m.IsIncremental + args.Incremental = m.CollectorStateManager.IsIncremental() graphqlCollector, err := NewGraphqlCollector(args) if err != nil { return err } - m.subtasks = append(m.subtasks, graphqlCollector) + m.nestedCollectors = append(m.nestedCollectors, graphqlCollector) return nil } -// Execute the embedded collector and record execute state -func (m *ApiCollectorStateManager) Execute() errors.Error { - for _, subtask := range m.subtasks { +// Execute all nested collectors and save the state if all collectors succeed +func (m *StatefulApiCollector) Execute() errors.Error { + for _, subtask := range m.nestedCollectors { err := subtask.Execute() if err != nil { return err } } - db := m.Ctx.GetDal() - return db.CreateOrUpdate(&m.newState) + return m.CollectorStateManager.Close() } // NewStatefulApiCollectorForFinalizableEntity aims to add timeFilter/diffSync support for @@ -222,8 +124,8 @@ func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArg return nil, err } - createdAfter := manager.Since - isIncremental := manager.IsIncremental + createdAfter := manager.CollectorStateManager.GetSince() + isIncremental := manager.CollectorStateManager.IsIncremental() // step 1: create a collector to collect newly added records err = manager.InitCollector(ApiCollectorArgs{ @@ -329,7 +231,6 @@ func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArg type FinalizableApiCollectorArgs struct { RawDataSubTaskArgs ApiClient RateLimitedApiClient - TimeAfter *time.Time // leave it be nil to disable time filter CollectNewRecordsByList FinalizableApiCollectorListArgs CollectUnfinishedDetails *FinalizableApiCollectorDetailArgs } diff --git a/backend/plugins/azuredevops_go/tasks/pr_collector.go b/backend/plugins/azuredevops_go/tasks/pr_collector.go index 7933b3d46..30a63dba7 100644 --- a/backend/plugins/azuredevops_go/tasks/pr_collector.go +++ b/backend/plugins/azuredevops_go/tasks/pr_collector.go @@ -19,11 +19,12 @@ package tasks import ( "fmt" + "net/url" + "time" + "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/plugin" "github.com/apache/incubator-devlake/helpers/pluginhelper/api" - "net/url" - "time" ) func init() { @@ -51,12 +52,12 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error { Options: data.Options, } - collectorWithState, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs) + apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs) if err != nil { return err } - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ RawDataSubTaskArgs: *rawDataSubTaskArgs, ApiClient: data.ApiClient, PageSize: 100, @@ -67,9 +68,9 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error { query.Set("$skip", fmt.Sprint(reqData.Pager.Skip)) query.Set("$top", fmt.Sprint(reqData.Pager.Size)) - if collectorWithState.Since != nil { + if apiCollector.GetSince() != nil { query.Set("searchCriteria.queryTimeRangeType", "created") - query.Set("searchCriteria.minTime", collectorWithState.Since.Format(time.RFC3339)) + query.Set("searchCriteria.minTime", apiCollector.GetSince().Format(time.RFC3339)) } return query, nil }, @@ -81,5 +82,5 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/bitbucket/tasks/api_common.go b/backend/plugins/bitbucket/tasks/api_common.go index 8990fdbaf..352f41cb9 100644 --- a/backend/plugins/bitbucket/tasks/api_common.go +++ b/backend/plugins/bitbucket/tasks/api_common.go @@ -93,7 +93,7 @@ func GetQuery(reqData *api.RequestData) (url.Values, errors.Error) { } // GetQueryCreatedAndUpdated is a common GeyQuery for timeFilter and incremental -func GetQueryCreatedAndUpdated(fields string, collectorWithState *api.ApiCollectorStateManager) func(reqData *api.RequestData) (url.Values, errors.Error) { +func GetQueryCreatedAndUpdated(fields string, apiCollector *api.StatefulApiCollector) func(reqData *api.RequestData) (url.Values, errors.Error) { return func(reqData *api.RequestData) (url.Values, errors.Error) { query, err := GetQuery(reqData) if err != nil { @@ -102,8 +102,8 @@ func GetQueryCreatedAndUpdated(fields string, collectorWithState *api.ApiCollect query.Set("fields", fields) query.Set("sort", "created_on") - if collectorWithState.Since != nil { - query.Set("q", fmt.Sprintf("updated_on>=%s", collectorWithState.Since.Format(time.RFC3339))) + if apiCollector.GetSince() != nil { + query.Set("q", fmt.Sprintf("updated_on>=%s", apiCollector.GetSince().Format(time.RFC3339))) } return query, nil } @@ -164,7 +164,7 @@ func GetRawMessageFromResponse(res *http.Response) ([]json.RawMessage, errors.Er return rawMessages.Values, nil } -func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState *api.ApiCollectorStateManager) (*api.DalCursorIterator, errors.Error) { +func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, sac *api.StatefulApiCollector) (*api.DalCursorIterator, errors.Error) { db := taskCtx.GetDal() data := taskCtx.GetData().(*BitbucketTaskData) clauses := []dal.Clause{ @@ -175,8 +175,8 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState * data.Options.FullName, data.Options.ConnectionId, ), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", *collectorWithState.Since)) + if sac.IsIncremental() && sac.GetSince() != nil { + clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", *sac.GetSince())) } // construct the input iterator @@ -188,7 +188,7 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState * return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(BitbucketInput{})) } -func GetIssuesIterator(taskCtx plugin.SubTaskContext, collectorWithState *api.ApiCollectorStateManager) (*api.DalCursorIterator, errors.Error) { +func GetIssuesIterator(taskCtx plugin.SubTaskContext, sac *api.StatefulApiCollector) (*api.DalCursorIterator, errors.Error) { db := taskCtx.GetDal() data := taskCtx.GetData().(*BitbucketTaskData) clauses := []dal.Clause{ @@ -199,8 +199,8 @@ func GetIssuesIterator(taskCtx plugin.SubTaskContext, collectorWithState *api.Ap data.Options.FullName, data.Options.ConnectionId, ), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", *collectorWithState.Since)) + if sac.IsIncremental() && sac.GetSince() != nil { + clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", *sac.GetSince())) } // construct the input iterator cursor, err := db.Cursor(clauses...) @@ -211,7 +211,7 @@ func GetIssuesIterator(taskCtx plugin.SubTaskContext, collectorWithState *api.Ap return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(BitbucketInput{})) } -func GetPipelinesIterator(taskCtx plugin.SubTaskContext, collectorWithState *api.ApiCollectorStateManager) (*api.DalCursorIterator, errors.Error) { +func GetPipelinesIterator(taskCtx plugin.SubTaskContext, sac *api.StatefulApiCollector) (*api.DalCursorIterator, errors.Error) { db := taskCtx.GetDal() data := taskCtx.GetData().(*BitbucketTaskData) clauses := []dal.Clause{ @@ -222,8 +222,8 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext, collectorWithState *api data.Options.FullName, data.Options.ConnectionId, ), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("bitbucket_complete_on > ?", *collectorWithState.Since)) + if sac.IsIncremental() && sac.GetSince() != nil { + clauses = append(clauses, dal.Where("bitbucket_complete_on > ?", *sac.GetSince())) } // construct the input iterator cursor, err := db.Cursor(clauses...) diff --git a/backend/plugins/bitbucket_server/tasks/api_common.go b/backend/plugins/bitbucket_server/tasks/api_common.go index 55e99ce5b..0b2ad1f68 100644 --- a/backend/plugins/bitbucket_server/tasks/api_common.go +++ b/backend/plugins/bitbucket_server/tasks/api_common.go @@ -119,7 +119,7 @@ func GetRawMessageFromResponse(res *http.Response) ([]json.RawMessage, errors.Er return rawMessages.Values, nil } -func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState *helper.ApiCollectorStateManager) (*helper.DalCursorIterator, errors.Error) { +func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, apiCollector *helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) { db := taskCtx.GetDal() data := taskCtx.GetData().(*BitbucketServerTaskData) clauses := []dal.Clause{ @@ -131,8 +131,8 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState * ), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("bpr.bitbucket_server_updated_at > ?", *collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("bpr.bitbucket_server_updated_at > ?", *apiCollector.GetSince())) } // construct the input iterator diff --git a/backend/plugins/github/tasks/cicd_job_collector.go b/backend/plugins/github/tasks/cicd_job_collector.go index dda4b90c0..60780339f 100644 --- a/backend/plugins/github/tasks/cicd_job_collector.go +++ b/backend/plugins/github/tasks/cicd_job_collector.go @@ -52,7 +52,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) // state manager - collectorWithState, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ + apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ Ctx: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, @@ -73,8 +73,8 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { data.Options.GithubId, data.Options.ConnectionId, ), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("github_updated_at > ?", collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("github_updated_at > ?", apiCollector.GetSince())) } cursor, err := db.Cursor(clauses...) if err != nil { @@ -85,7 +85,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { return err } // collect jobs - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ RawDataSubTaskArgs: api.RawDataSubTaskArgs{ Ctx: taskCtx, Params: GithubApiParams{ @@ -118,7 +118,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { if err != nil { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } type SimpleGithubRun struct { diff --git a/backend/plugins/github/tasks/comment_collector.go b/backend/plugins/github/tasks/comment_collector.go index e06f023f4..9f40e7736 100644 --- a/backend/plugins/github/tasks/comment_collector.go +++ b/backend/plugins/github/tasks/comment_collector.go @@ -46,7 +46,7 @@ var CollectApiCommentsMeta = plugin.SubTaskMeta{ func CollectApiComments(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) - collectorWithState, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ + apiCollector, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ Ctx: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, @@ -58,15 +58,15 @@ func CollectApiComments(taskCtx plugin.SubTaskContext) errors.Error { return err } - err = collectorWithState.InitCollector(helper.ApiCollectorArgs{ + err = apiCollector.InitCollector(helper.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: 100, UrlTemplate: "repos/{{ .Params.Name }}/issues/comments", Query: func(reqData *helper.RequestData) (url.Values, errors.Error) { query := url.Values{} query.Set("state", "all") - if collectorWithState.Since != nil { - query.Set("since", collectorWithState.Since.String()) + if apiCollector.GetSince() != nil { + query.Set("since", apiCollector.GetSince().String()) } query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page)) query.Set("direction", "asc") @@ -89,5 +89,5 @@ func CollectApiComments(taskCtx plugin.SubTaskContext) errors.Error { return errors.Default.Wrap(err, "error collecting github comments") } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/github/tasks/commit_collector.go b/backend/plugins/github/tasks/commit_collector.go index b735a508d..019060c17 100644 --- a/backend/plugins/github/tasks/commit_collector.go +++ b/backend/plugins/github/tasks/commit_collector.go @@ -46,7 +46,7 @@ var CollectApiCommitsMeta = plugin.SubTaskMeta{ func CollectApiCommits(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) - collectorWithState, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ + apiCollector, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ Ctx: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, @@ -58,7 +58,7 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext) errors.Error { return err } - err = collectorWithState.InitCollector(helper.ApiCollectorArgs{ + err = apiCollector.InitCollector(helper.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: 100, /* @@ -77,8 +77,8 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext) errors.Error { Query: func(reqData *helper.RequestData) (url.Values, errors.Error) { query := url.Values{} query.Set("state", "all") - if collectorWithState.Since != nil { - query.Set("since", collectorWithState.Since.String()) + if apiCollector.GetSince() != nil { + query.Set("since", apiCollector.GetSince().String()) } query.Set("direction", "asc") query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page)) @@ -117,5 +117,5 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/github/tasks/issue_collector.go b/backend/plugins/github/tasks/issue_collector.go index 4e9b25d21..5558da697 100644 --- a/backend/plugins/github/tasks/issue_collector.go +++ b/backend/plugins/github/tasks/issue_collector.go @@ -46,7 +46,7 @@ var CollectApiIssuesMeta = plugin.SubTaskMeta{ func CollectApiIssues(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) - collectorWithState, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ + apiCollector, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ Ctx: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, @@ -58,7 +58,7 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) errors.Error { return err } - err = collectorWithState.InitCollector(helper.ApiCollectorArgs{ + err = apiCollector.InitCollector(helper.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: 100, /* @@ -77,8 +77,8 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) errors.Error { Query: func(reqData *helper.RequestData) (url.Values, errors.Error) { query := url.Values{} query.Set("state", "all") - if collectorWithState.Since != nil { - query.Set("since", collectorWithState.Since.String()) + if apiCollector.GetSince() != nil { + query.Set("since", apiCollector.GetSince().String()) } query.Set("direction", "asc") query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page)) @@ -117,5 +117,5 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/github/tasks/pr_commit_collector.go b/backend/plugins/github/tasks/pr_commit_collector.go index 487e6ac14..0604816d8 100644 --- a/backend/plugins/github/tasks/pr_commit_collector.go +++ b/backend/plugins/github/tasks/pr_commit_collector.go @@ -61,7 +61,7 @@ func CollectApiPullRequestCommits(taskCtx plugin.SubTaskContext) errors.Error { db := taskCtx.GetDal() data := taskCtx.GetData().(*GithubTaskData) - collectorWithState, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ + apiCollector, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ Ctx: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, @@ -78,10 +78,10 @@ func CollectApiPullRequestCommits(taskCtx plugin.SubTaskContext) errors.Error { dal.From(models.GithubPullRequest{}.TableName()), dal.Where("repo_id = ? and connection_id=?", data.Options.GithubId, data.Options.ConnectionId), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { clauses = append( clauses, - dal.Where("github_updated_at > ?", collectorWithState.Since), + dal.Where("github_updated_at > ?", apiCollector.GetSince()), ) } @@ -95,7 +95,7 @@ func CollectApiPullRequestCommits(taskCtx plugin.SubTaskContext) errors.Error { if err != nil { return err } - err = collectorWithState.InitCollector(helper.ApiCollectorArgs{ + err = apiCollector.InitCollector(helper.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: 100, Input: iterator, @@ -141,5 +141,5 @@ func CollectApiPullRequestCommits(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/github/tasks/pr_review_collector.go b/backend/plugins/github/tasks/pr_review_collector.go index 60430c583..e4a16649d 100644 --- a/backend/plugins/github/tasks/pr_review_collector.go +++ b/backend/plugins/github/tasks/pr_review_collector.go @@ -53,7 +53,7 @@ func CollectApiPullRequestReviews(taskCtx plugin.SubTaskContext) errors.Error { db := taskCtx.GetDal() data := taskCtx.GetData().(*GithubTaskData) - collectorWithState, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ + apiCollector, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ Ctx: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, @@ -70,10 +70,10 @@ func CollectApiPullRequestReviews(taskCtx plugin.SubTaskContext) errors.Error { dal.From(models.GithubPullRequest{}.TableName()), dal.Where("repo_id = ? and connection_id=?", data.Options.GithubId, data.Options.ConnectionId), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { clauses = append( clauses, - dal.Where("github_updated_at > ?", collectorWithState.Since), + dal.Where("github_updated_at > ?", apiCollector.GetSince()), ) } @@ -89,7 +89,7 @@ func CollectApiPullRequestReviews(taskCtx plugin.SubTaskContext) errors.Error { return err } - err = collectorWithState.InitCollector(helper.ApiCollectorArgs{ + err = apiCollector.InitCollector(helper.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: 100, Input: iterator, @@ -117,5 +117,5 @@ func CollectApiPullRequestReviews(taskCtx plugin.SubTaskContext) errors.Error { if err != nil { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/github/tasks/pr_review_comment_collector.go b/backend/plugins/github/tasks/pr_review_comment_collector.go index d53371444..d6dfb296f 100644 --- a/backend/plugins/github/tasks/pr_review_comment_collector.go +++ b/backend/plugins/github/tasks/pr_review_comment_collector.go @@ -49,7 +49,7 @@ var CollectApiPrReviewCommentsMeta = plugin.SubTaskMeta{ func CollectPrReviewComments(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) - collectorWithState, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ + apiCollector, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ Ctx: taskCtx, Params: GithubApiParams{ ConnectionId: data.Options.ConnectionId, @@ -61,7 +61,7 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext) errors.Error { return err } - err = collectorWithState.InitCollector(helper.ApiCollectorArgs{ + err = apiCollector.InitCollector(helper.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: 100, Header: func(reqData *helper.RequestData) (http.Header, errors.Error) { @@ -74,8 +74,8 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext) errors.Error { UrlTemplate: "repos/{{ .Params.Name }}/pulls/comments", Query: func(reqData *helper.RequestData) (url.Values, errors.Error) { query := url.Values{} - if collectorWithState.Since != nil { - query.Set("since", collectorWithState.Since.String()) + if apiCollector.GetSince() != nil { + query.Set("since", apiCollector.GetSince().String()) } query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page)) query.Set("direction", "asc") @@ -97,5 +97,5 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/github_graphql/tasks/deployment_collector.go b/backend/plugins/github_graphql/tasks/deployment_collector.go index 437088629..e3a646baf 100644 --- a/backend/plugins/github_graphql/tasks/deployment_collector.go +++ b/backend/plugins/github_graphql/tasks/deployment_collector.go @@ -86,7 +86,7 @@ type GraphqlQueryDeploymentDeployment struct { // CollectDeployments will request github api via graphql and store the result into raw layer. func CollectDeployments(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*githubTasks.GithubTaskData) - collectorWithState, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ + apiCollector, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ Ctx: taskCtx, Params: githubTasks.GithubApiParams{ ConnectionId: data.Options.ConnectionId, @@ -98,7 +98,7 @@ func CollectDeployments(taskCtx plugin.SubTaskContext) errors.Error { return err } - err = collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{ + err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{ GraphqlClient: data.GraphqlClient, PageSize: 100, BuildQuery: func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) { @@ -124,7 +124,7 @@ func CollectDeployments(taskCtx plugin.SubTaskContext) errors.Error { query := iQuery.(*GraphqlQueryDeploymentWrapper) deployments := query.Repository.Deployments.Deployments for _, rawL := range deployments { - if collectorWithState.Since != nil && !collectorWithState.Since.Before(rawL.UpdatedAt) { + if apiCollector.GetSince() != nil && !apiCollector.GetSince().Before(rawL.UpdatedAt) { return nil, helper.ErrFinishCollect } } @@ -134,5 +134,5 @@ func CollectDeployments(taskCtx plugin.SubTaskContext) errors.Error { if err != nil { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go b/backend/plugins/github_graphql/tasks/issue_collector.go index 313b7cbac..dbf1b1e95 100644 --- a/backend/plugins/github_graphql/tasks/issue_collector.go +++ b/backend/plugins/github_graphql/tasks/issue_collector.go @@ -82,7 +82,7 @@ var _ plugin.SubTaskEntryPoint = CollectIssues func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*githubTasks.GithubTaskData) - collectorWithState, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ + apiCollector, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ Ctx: taskCtx, Params: githubTasks.GithubApiParams{ ConnectionId: data.Options.ConnectionId, @@ -94,7 +94,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error { return err } - err = collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{ + err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{ GraphqlClient: data.GraphqlClient, PageSize: 10, BuildQuery: func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) { @@ -119,7 +119,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error { query := iQuery.(*GraphqlQueryIssueWrapper) issues := query.Repository.IssueList.Issues for _, rawL := range issues { - if collectorWithState.Since != nil && !collectorWithState.Since.Before(rawL.UpdatedAt) { + if apiCollector.GetSince() != nil && !apiCollector.GetSince().Before(rawL.UpdatedAt) { return nil, helper.ErrFinishCollect } } @@ -130,5 +130,5 @@ func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/github_graphql/tasks/job_collector.go b/backend/plugins/github_graphql/tasks/job_collector.go index 48ec17045..effac75ff 100644 --- a/backend/plugins/github_graphql/tasks/job_collector.go +++ b/backend/plugins/github_graphql/tasks/job_collector.go @@ -100,7 +100,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { db := taskCtx.GetDal() data := taskCtx.GetData().(*githubTasks.GithubTaskData) - collectorWithState, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ + apiCollector, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ Ctx: taskCtx, Params: githubTasks.GithubApiParams{ ConnectionId: data.Options.ConnectionId, @@ -118,8 +118,8 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { dal.Where("repo_id = ? and connection_id=?", data.Options.GithubId, data.Options.ConnectionId), dal.Orderby("github_updated_at DESC"), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("github_updated_at > ?", *collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("github_updated_at > ?", *apiCollector.GetSince())) } cursor, err := db.Cursor( @@ -134,7 +134,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { return err } - err = collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{ + err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{ Input: iterator, InputStep: 20, GraphqlClient: data.GraphqlClient, @@ -168,5 +168,5 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go b/backend/plugins/github_graphql/tasks/pr_collector.go index 2bc887305..903ce6c5b 100644 --- a/backend/plugins/github_graphql/tasks/pr_collector.go +++ b/backend/plugins/github_graphql/tasks/pr_collector.go @@ -130,7 +130,7 @@ var _ plugin.SubTaskEntryPoint = CollectPrs func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*tasks.GithubTaskData) var err errors.Error - collectorWithState, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ + apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ Ctx: taskCtx, Params: tasks.GithubApiParams{ ConnectionId: data.Options.ConnectionId, @@ -142,7 +142,7 @@ func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error { return err } - err = collectorWithState.InitGraphQLCollector(api.GraphqlCollectorArgs{ + err = apiCollector.InitGraphQLCollector(api.GraphqlCollectorArgs{ GraphqlClient: data.GraphqlClient, PageSize: 10, /* @@ -170,7 +170,7 @@ func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error { query := iQuery.(*GraphqlQueryPrWrapper) prs := query.Repository.PullRequests.Prs for _, rawL := range prs { - if collectorWithState.Since != nil && !collectorWithState.Since.Before(rawL.CreatedAt) { + if apiCollector.GetSince() != nil && !apiCollector.GetSince().Before(rawL.CreatedAt) { return nil, api.ErrFinishCollect } } @@ -181,5 +181,5 @@ func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/gitlab/tasks/deployment_collector.go b/backend/plugins/gitlab/tasks/deployment_collector.go index 6c4c2a74a..36ecb60c1 100644 --- a/backend/plugins/gitlab/tasks/deployment_collector.go +++ b/backend/plugins/gitlab/tasks/deployment_collector.go @@ -49,11 +49,11 @@ var CollectDeploymentMeta = plugin.SubTaskMeta{ func CollectDeployment(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_DEPLOYMENT) - collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) + apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) if err != nil { return err } - err = collectorWithState.InitCollector(helper.ApiCollectorArgs{ + err = apiCollector.InitCollector(helper.ApiCollectorArgs{ RawDataSubTaskArgs: *rawDataSubTaskArgs, ApiClient: data.ApiClient, PageSize: 100, @@ -70,11 +70,11 @@ func CollectDeployment(taskCtx plugin.SubTaskContext) errors.Error { } else { query.Set("order_by", "created_at") } - if collectorWithState.Since != nil { - query.Set("updated_after", collectorWithState.Since.Format(time.RFC3339)) + if apiCollector.GetSince() != nil { + query.Set("updated_after", apiCollector.GetSince().Format(time.RFC3339)) } - if collectorWithState.Before != nil { - query.Set("updated_before", collectorWithState.Before.Format(time.RFC3339)) + if apiCollector.GetUntil() != nil { + query.Set("updated_before", apiCollector.GetUntil().Format(time.RFC3339)) } return query, nil }, @@ -84,5 +84,5 @@ func CollectDeployment(taskCtx plugin.SubTaskContext) errors.Error { if err != nil { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/gitlab/tasks/issue_collector.go b/backend/plugins/gitlab/tasks/issue_collector.go index 92c3fbc21..9ae57fb4f 100644 --- a/backend/plugins/gitlab/tasks/issue_collector.go +++ b/backend/plugins/gitlab/tasks/issue_collector.go @@ -61,8 +61,8 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) errors.Error { */ Query: func(reqData *helper.RequestData) (url.Values, errors.Error) { query := url.Values{} - if collectorWithState.Since != nil { - query.Set("updated_after", collectorWithState.Since.Format(time.RFC3339)) + if collectorWithState.GetSince() != nil { + query.Set("updated_after", collectorWithState.GetSince().Format(time.RFC3339)) } query.Set("sort", "asc") query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page)) diff --git a/backend/plugins/gitlab/tasks/mr_collector.go b/backend/plugins/gitlab/tasks/mr_collector.go index 029b19043..320d977d2 100644 --- a/backend/plugins/gitlab/tasks/mr_collector.go +++ b/backend/plugins/gitlab/tasks/mr_collector.go @@ -43,12 +43,12 @@ var CollectApiMergeRequestsMeta = plugin.SubTaskMeta{ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_MERGE_REQUEST_TABLE) - collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) + apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) if err != nil { return err } - err = collectorWithState.InitCollector(helper.ApiCollectorArgs{ + err = apiCollector.InitCollector(helper.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: 100, UrlTemplate: "projects/{{ .Params.ProjectId }}/merge_requests", @@ -59,8 +59,8 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext) errors.Error { if err != nil { return nil, err } - if collectorWithState.Since != nil { - query.Set("updated_after", collectorWithState.Since.Format(time.RFC3339)) + if apiCollector.GetSince() != nil { + query.Set("updated_after", apiCollector.GetSince().Format(time.RFC3339)) } return query, nil }, @@ -70,5 +70,5 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/gitlab/tasks/mr_detail_collector.go b/backend/plugins/gitlab/tasks/mr_detail_collector.go index eb4f8f3a6..bfb64f031 100644 --- a/backend/plugins/gitlab/tasks/mr_detail_collector.go +++ b/backend/plugins/gitlab/tasks/mr_detail_collector.go @@ -74,7 +74,7 @@ func CollectApiMergeRequestDetails(taskCtx plugin.SubTaskContext) errors.Error { return collectorWithState.Execute() } -func GetMergeRequestDetailsIterator(taskCtx plugin.SubTaskContext, collectorWithState *helper.ApiCollectorStateManager) (*helper.DalCursorIterator, errors.Error) { +func GetMergeRequestDetailsIterator(taskCtx plugin.SubTaskContext, apiCollector *helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) { db := taskCtx.GetDal() data := taskCtx.GetData().(*GitlabTaskData) clauses := []dal.Clause{ @@ -85,8 +85,8 @@ func GetMergeRequestDetailsIterator(taskCtx plugin.SubTaskContext, collectorWith data.Options.ProjectId, data.Options.ConnectionId, true, ), } - if collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("gitlab_updated_at > ?", *collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("gitlab_updated_at > ?", *apiCollector.GetSince())) } // construct the input iterator diff --git a/backend/plugins/gitlab/tasks/pipeline_collector.go b/backend/plugins/gitlab/tasks/pipeline_collector.go index ceb911d68..77415506f 100644 --- a/backend/plugins/gitlab/tasks/pipeline_collector.go +++ b/backend/plugins/gitlab/tasks/pipeline_collector.go @@ -44,7 +44,7 @@ var CollectApiPipelinesMeta = plugin.SubTaskMeta{ func CollectApiPipelines(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_PIPELINE_TABLE) - collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) + apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) if err != nil { return err } @@ -54,7 +54,7 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext) errors.Error { return err } - err = collectorWithState.InitCollector(helper.ApiCollectorArgs{ + err = apiCollector.InitCollector(helper.ApiCollectorArgs{ RawDataSubTaskArgs: *rawDataSubTaskArgs, ApiClient: data.ApiClient, MinTickInterval: &tickInterval, @@ -62,8 +62,8 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext) errors.Error { UrlTemplate: "projects/{{ .Params.ProjectId }}/pipelines", Query: func(reqData *helper.RequestData) (url.Values, errors.Error) { query := url.Values{} - if collectorWithState.Since != nil { - query.Set("updated_after", collectorWithState.Since.Format(time.RFC3339)) + if apiCollector.GetSince() != nil { + query.Set("updated_after", apiCollector.GetSince().Format(time.RFC3339)) } query.Set("with_stats", "true") query.Set("sort", "asc") @@ -78,5 +78,5 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go index 227d19e93..a4bed6b0d 100644 --- a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go +++ b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go @@ -82,7 +82,7 @@ func CollectApiPipelineDetails(taskCtx plugin.SubTaskContext) errors.Error { return collectorWithState.Execute() } -func GetPipelinesIterator(taskCtx plugin.SubTaskContext, collectorWithState *helper.ApiCollectorStateManager) (*helper.DalCursorIterator, errors.Error) { +func GetPipelinesIterator(taskCtx plugin.SubTaskContext, apiCollector *helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) { db := taskCtx.GetDal() data := taskCtx.GetData().(*GitlabTaskData) clauses := []dal.Clause{ @@ -93,8 +93,8 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext, collectorWithState *hel data.Options.ProjectId, data.Options.ConnectionId, ), } - if collectorWithState.Since != nil && collectorWithState.IsIncremental { - clauses = append(clauses, dal.Where("gitlab_updated_at > ?", *collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("gitlab_updated_at > ?", *apiCollector.GetSince())) } // construct the input iterator cursor, err := db.Cursor(clauses...) diff --git a/backend/plugins/gitlab/tasks/shared.go b/backend/plugins/gitlab/tasks/shared.go index f20514961..4051c93b5 100644 --- a/backend/plugins/gitlab/tasks/shared.go +++ b/backend/plugins/gitlab/tasks/shared.go @@ -168,7 +168,7 @@ func CreateRawDataSubTaskArgs(taskCtx plugin.SubTaskContext, Table string) (*hel return rawDataSubTaskArgs, data } -func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState *helper.ApiCollectorStateManager) (*helper.DalCursorIterator, errors.Error) { +func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext, apiCollector *helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) { db := taskCtx.GetDal() data := taskCtx.GetData().(*GitlabTaskData) clauses := []dal.Clause{ @@ -179,9 +179,9 @@ func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState data.Options.ProjectId, data.Options.ConnectionId, ), } - if collectorWithState != nil { - if collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("gitlab_updated_at > ?", *collectorWithState.Since)) + if apiCollector != nil { + if apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("gitlab_updated_at > ?", *apiCollector.GetSince())) } } // construct the input iterator diff --git a/backend/plugins/gitlab/tasks/trigger_job_collector.go b/backend/plugins/gitlab/tasks/trigger_job_collector.go index 755eeb67a..37ceba215 100644 --- a/backend/plugins/gitlab/tasks/trigger_job_collector.go +++ b/backend/plugins/gitlab/tasks/trigger_job_collector.go @@ -44,7 +44,7 @@ var CollectApiTriggerJobsMeta = plugin.SubTaskMeta{ func CollectApiTriggerJobs(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_TRIGGER_JOB_TABLE) - collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) + apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) if err != nil { return err } @@ -52,17 +52,15 @@ func CollectApiTriggerJobs(taskCtx plugin.SubTaskContext) errors.Error { if err != nil { return err } - iterator, err := GetAllPipelinesIterator(taskCtx, collectorWithState) + iterator, err := GetAllPipelinesIterator(taskCtx, apiCollector) if err != nil { return err } - incremental := collectorWithState.IsIncremental - err = collectorWithState.InitCollector(helper.ApiCollectorArgs{ + err = apiCollector.InitCollector(helper.ApiCollectorArgs{ ApiClient: data.ApiClient, MinTickInterval: &tickInterval, PageSize: 100, - Incremental: incremental, Input: iterator, UrlTemplate: "projects/{{ .Params.ProjectId }}/pipelines/{{ .Input.GitlabId }}/bridges", ResponseParser: GetRawMessageFromResponse, @@ -73,10 +71,10 @@ func CollectApiTriggerJobs(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } -func GetAllPipelinesIterator(taskCtx plugin.SubTaskContext, collectorWithState *helper.ApiCollectorStateManager) (*helper.DalCursorIterator, errors.Error) { +func GetAllPipelinesIterator(taskCtx plugin.SubTaskContext, apiCollector *helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) { db := taskCtx.GetDal() data := taskCtx.GetData().(*GitlabTaskData) clauses := []dal.Clause{ @@ -87,8 +85,8 @@ func GetAllPipelinesIterator(taskCtx plugin.SubTaskContext, collectorWithState * data.Options.ProjectId, data.Options.ConnectionId, ), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("gitlab_updated_at > ?", collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("gitlab_updated_at > ?", apiCollector.GetSince())) } // construct the input iterator cursor, err := db.Cursor(clauses...) diff --git a/backend/plugins/jenkins/tasks/stage_collector.go b/backend/plugins/jenkins/tasks/stage_collector.go index 4503d8560..30501065a 100644 --- a/backend/plugins/jenkins/tasks/stage_collector.go +++ b/backend/plugins/jenkins/tasks/stage_collector.go @@ -49,7 +49,7 @@ func CollectApiStages(taskCtx plugin.SubTaskContext) errors.Error { db := taskCtx.GetDal() data := taskCtx.GetData().(*JenkinsTaskData) - collectorWithState, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ + apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ Params: JenkinsApiParams{ ConnectionId: data.Options.ConnectionId, FullName: data.Options.JobFullName, @@ -67,8 +67,8 @@ func CollectApiStages(taskCtx plugin.SubTaskContext) errors.Error { dal.Where(`tjb.connection_id = ? and tjb.job_path = ? and tjb.job_name = ? and tjb.class = ?`, data.Options.ConnectionId, data.Options.JobPath, data.Options.JobName, "WorkflowRun"), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { - clauses = append(clauses, dal.Where(`tjb.start_time >= ?`, collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where(`tjb.start_time >= ?`, apiCollector.GetSince())) } cursor, err := db.Cursor(clauses...) if err != nil { @@ -81,7 +81,7 @@ func CollectApiStages(taskCtx plugin.SubTaskContext) errors.Error { return err } - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ ApiClient: data.ApiClient, Input: iterator, UrlTemplate: fmt.Sprintf("%sjob/%s/{{ .Input.Number }}/wfapi/describe", data.Options.JobPath, data.Options.JobName), @@ -109,5 +109,5 @@ func CollectApiStages(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/jira/tasks/development_panel_collector.go b/backend/plugins/jira/tasks/development_panel_collector.go index c513c5bf5..1381b5475 100644 --- a/backend/plugins/jira/tasks/development_panel_collector.go +++ b/backend/plugins/jira/tasks/development_panel_collector.go @@ -53,7 +53,7 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext) errors.Error { } db := taskCtx.GetDal() logger := taskCtx.GetLogger() - collectorWithState, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ + apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ Ctx: taskCtx, Params: JiraApiParams{ ConnectionId: data.Options.ConnectionId, @@ -72,8 +72,8 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext) errors.Error { dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"), dal.Where("bi.connection_id=? and bi.board_id = ?", data.Options.ConnectionId, data.Options.BoardId), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("i.updated > ?", collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("i.updated > ?", apiCollector.GetSince())) } cursor, err := db.Cursor(clauses...) if err != nil { @@ -87,7 +87,7 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext) errors.Error { return err } - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ ApiClient: data.ApiClient, Input: iterator, // the URL looks like: @@ -122,5 +122,5 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/jira/tasks/epic_collector.go b/backend/plugins/jira/tasks/epic_collector.go index 25cbd3998..f7f79ddf2 100644 --- a/backend/plugins/jira/tasks/epic_collector.go +++ b/backend/plugins/jira/tasks/epic_collector.go @@ -59,7 +59,7 @@ func CollectEpics(taskCtx plugin.SubTaskContext) errors.Error { return err } - collectorWithState, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ + apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ Ctx: taskCtx, Params: JiraApiParams{ ConnectionId: data.Options.ConnectionId, @@ -78,11 +78,11 @@ func CollectEpics(taskCtx plugin.SubTaskContext) errors.Error { logger.Info("got user's timezone: %v", loc.String()) } jql := "ORDER BY created ASC" - if collectorWithState.Since != nil { - jql = "and " + buildJQL(*collectorWithState.Since, loc) + if apiCollector.GetSince() != nil { + jql = "and " + buildJQL(*apiCollector.GetSince(), loc) } - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: 100, Incremental: false, @@ -123,7 +123,7 @@ func CollectEpics(taskCtx plugin.SubTaskContext) errors.Error { if err != nil { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } func GetEpicKeysIterator(db dal.Dal, data *JiraTaskData, batchSize int) (api.Iterator, errors.Error) { diff --git a/backend/plugins/jira/tasks/issue_changelog_collector.go b/backend/plugins/jira/tasks/issue_changelog_collector.go index 9f29436c9..76c1eef38 100644 --- a/backend/plugins/jira/tasks/issue_changelog_collector.go +++ b/backend/plugins/jira/tasks/issue_changelog_collector.go @@ -53,7 +53,7 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext) errors.Error { logger := taskCtx.GetLogger() db := taskCtx.GetDal() - collectorWithState, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ + apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ Ctx: taskCtx, Params: JiraApiParams{ ConnectionId: data.Options.ConnectionId, @@ -71,8 +71,8 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext) errors.Error { dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"), dal.Where("bi.connection_id=? and bi.board_id = ? AND i.std_type != ? and i.changelog_total > 100", data.Options.ConnectionId, data.Options.BoardId, "Epic"), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("i.updated > ?", collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("i.updated > ?", apiCollector.GetSince())) } if logger.IsLevelEnabled(log.LOG_DEBUG) { @@ -95,7 +95,7 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext) errors.Error { } // now, let ApiCollector takes care the rest - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: 100, GetTotalPages: GetTotalPagesFromResponse, @@ -125,5 +125,5 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/jira/tasks/issue_collector.go b/backend/plugins/jira/tasks/issue_collector.go index 3faaa9d51..84ac2d727 100644 --- a/backend/plugins/jira/tasks/issue_collector.go +++ b/backend/plugins/jira/tasks/issue_collector.go @@ -48,7 +48,7 @@ var CollectIssuesMeta = plugin.SubTaskMeta{ func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*JiraTaskData) logger := taskCtx.GetLogger() - collectorWithState, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ + apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ Ctx: taskCtx, /* This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal @@ -77,11 +77,11 @@ func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error { logger.Info("got user's timezone: %v", loc.String()) } jql := "ORDER BY created ASC" - if collectorWithState.Since != nil { - jql = buildJQL(*collectorWithState.Since, loc) + if apiCollector.GetSince() != nil { + jql = buildJQL(*apiCollector.GetSince(), loc) } - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: data.Options.PageSize, /* @@ -143,7 +143,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } // buildJQL build jql based on timeAfter and incremental mode diff --git a/backend/plugins/jira/tasks/issue_comment_collector.go b/backend/plugins/jira/tasks/issue_comment_collector.go index aa6ca2cdd..5ef2777be 100644 --- a/backend/plugins/jira/tasks/issue_comment_collector.go +++ b/backend/plugins/jira/tasks/issue_comment_collector.go @@ -53,7 +53,7 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext) errors.Error { logger := taskCtx.GetLogger() db := taskCtx.GetDal() - collectorWithState, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ + apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ Ctx: taskCtx, Params: JiraApiParams{ ConnectionId: data.Options.ConnectionId, @@ -71,8 +71,8 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext) errors.Error { dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"), dal.Where("bi.connection_id=? and bi.board_id = ? AND i.std_type != ? AND i.comment_total > 100", data.Options.ConnectionId, data.Options.BoardId, "Epic"), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("i.updated > ?", collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("i.updated > ?", apiCollector.GetSince())) } if logger.IsLevelEnabled(log.LOG_DEBUG) { count, err := db.Count(clauses...) @@ -94,7 +94,7 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext) errors.Error { } // now, let ApiCollector takes care the rest - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: 100, GetTotalPages: GetTotalPagesFromResponse, @@ -124,5 +124,5 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/jira/tasks/remotelink_collector.go b/backend/plugins/jira/tasks/remotelink_collector.go index 2072b1f28..6b44ca7e5 100644 --- a/backend/plugins/jira/tasks/remotelink_collector.go +++ b/backend/plugins/jira/tasks/remotelink_collector.go @@ -52,7 +52,7 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext) errors.Error { logger := taskCtx.GetLogger() logger.Info("collect remotelink") - collectorWithState, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ + apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ Ctx: taskCtx, Params: JiraApiParams{ ConnectionId: data.Options.ConnectionId, @@ -70,8 +70,8 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext) errors.Error { dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"), dal.Where("bi.connection_id=? and bi.board_id = ?", data.Options.ConnectionId, data.Options.BoardId), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("i.updated > ?", collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("i.updated > ?", apiCollector.GetSince())) } cursor, err := db.Cursor(clauses...) if err != nil { @@ -85,7 +85,7 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext) errors.Error { return err } - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ ApiClient: data.ApiClient, Input: iterator, UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/remotelink", @@ -105,7 +105,7 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext) errors.Error { if err != nil { return err } - err = collectorWithState.Execute() + err = apiCollector.Execute() if err != nil { return err } diff --git a/backend/plugins/jira/tasks/worklog_collector.go b/backend/plugins/jira/tasks/worklog_collector.go index a22005bc3..8112a6d0a 100644 --- a/backend/plugins/jira/tasks/worklog_collector.go +++ b/backend/plugins/jira/tasks/worklog_collector.go @@ -45,7 +45,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) errors.Error { logger := taskCtx.GetLogger() - collectorWithState, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ + apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ Ctx: taskCtx, Params: JiraApiParams{ ConnectionId: data.Options.ConnectionId, @@ -66,8 +66,14 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) errors.Error { dal.Where("i.updated > i.created AND bi.connection_id = ? AND bi.board_id = ? ", data.Options.ConnectionId, data.Options.BoardId), dal.Groupby("i.issue_id, i.updated"), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { - clauses = append(clauses, dal.Having("i.updated > ? AND (i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND COUNT(wl.worklog_id) > 0))", collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append( + clauses, + dal.Having( + "i.updated > ? AND (i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND COUNT(wl.worklog_id) > 0))", + apiCollector.GetSince(), + ), + ) } else { /* i.updated > max(wl.issue_updated) was deleted because for non-incremental collection, @@ -89,7 +95,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) errors.Error { return err } - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ Input: iterator, ApiClient: data.ApiClient, UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/worklog", @@ -112,5 +118,5 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } diff --git a/backend/plugins/tapd/tasks/bug_changelog_collector.go b/backend/plugins/tapd/tasks/bug_changelog_collector.go index bf768e0d1..0822eeee9 100644 --- a/backend/plugins/tapd/tasks/bug_changelog_collector.go +++ b/backend/plugins/tapd/tasks/bug_changelog_collector.go @@ -34,12 +34,12 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_BUG_CHANGELOG_TABLE) logger := taskCtx.GetLogger() logger.Info("collect storyChangelogs") - collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) + apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) if err != nil { return err } - err = collectorWithState.InitCollector(helper.ApiCollectorArgs{ + err = apiCollector.InitCollector(helper.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: int(data.Options.PageSize), UrlTemplate: "bug_changes", @@ -49,8 +49,8 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext) errors.Error { query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page)) query.Set("limit", fmt.Sprintf("%v", reqData.Pager.Size)) query.Set("order", "created desc") - if collectorWithState.Since != nil { - query.Set("created", fmt.Sprintf(">%s", collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02"))) + if apiCollector.GetSince() != nil { + query.Set("created", fmt.Sprintf(">%s", apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02"))) } return query, nil }, @@ -60,7 +60,7 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext) errors.Error { logger.Error(err, "collect story changelog error") return err } - return collectorWithState.Execute() + return apiCollector.Execute() } var CollectBugChangelogMeta = plugin.SubTaskMeta{ diff --git a/backend/plugins/tapd/tasks/bug_collector.go b/backend/plugins/tapd/tasks/bug_collector.go index 746c1bd89..56479b139 100644 --- a/backend/plugins/tapd/tasks/bug_collector.go +++ b/backend/plugins/tapd/tasks/bug_collector.go @@ -34,12 +34,12 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_BUG_TABLE) logger := taskCtx.GetLogger() logger.Info("collect bugs") - collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) + apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) if err != nil { return err } - err = collectorWithState.InitCollector(helper.ApiCollectorArgs{ + err = apiCollector.InitCollector(helper.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: int(data.Options.PageSize), UrlTemplate: "bugs", @@ -50,8 +50,8 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error { query.Set("limit", fmt.Sprintf("%v", reqData.Pager.Size)) query.Set("fields", "labels") query.Set("order", "created asc") - if collectorWithState.Since != nil { - query.Set("modified", fmt.Sprintf(">%s", collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02"))) + if apiCollector.GetSince() != nil { + query.Set("modified", fmt.Sprintf(">%s", apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02"))) } return query, nil }, @@ -61,7 +61,7 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error { logger.Error(err, "collect bug error") return err } - return collectorWithState.Execute() + return apiCollector.Execute() } var CollectBugMeta = plugin.SubTaskMeta{ diff --git a/backend/plugins/tapd/tasks/bug_commit_collector.go b/backend/plugins/tapd/tasks/bug_commit_collector.go index 3164adb05..f3abeafe8 100644 --- a/backend/plugins/tapd/tasks/bug_commit_collector.go +++ b/backend/plugins/tapd/tasks/bug_commit_collector.go @@ -38,7 +38,7 @@ var _ plugin.SubTaskEntryPoint = CollectBugCommits func CollectBugCommits(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_BUG_COMMIT_TABLE) db := taskCtx.GetDal() - collectorWithState, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs) + apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs) if err != nil { return err } @@ -50,8 +50,8 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) errors.Error { dal.From(&models.TapdBug{}), dal.Where("_tool_tapd_bugs.connection_id = ? and _tool_tapd_bugs.workspace_id = ? ", data.Options.ConnectionId, data.Options.WorkspaceId), } - if collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("modified > ?", *collectorWithState.Since)) + if apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("modified > ?", *apiCollector.GetSince())) } cursor, err := db.Cursor(clauses...) if err != nil { @@ -62,7 +62,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) errors.Error { if err != nil { return err } - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ ApiClient: data.ApiClient, Input: iterator, UrlTemplate: "code_commit_infos", @@ -87,7 +87,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) errors.Error { logger.Error(err, "collect issueCommit error") return err } - return collectorWithState.Execute() + return apiCollector.Execute() } var CollectBugCommitMeta = plugin.SubTaskMeta{ diff --git a/backend/plugins/tapd/tasks/iteration_collector.go b/backend/plugins/tapd/tasks/iteration_collector.go index 5f5512b1f..1fc7b72aa 100644 --- a/backend/plugins/tapd/tasks/iteration_collector.go +++ b/backend/plugins/tapd/tasks/iteration_collector.go @@ -36,12 +36,12 @@ func CollectIterations(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_ITERATION_TABLE) logger := taskCtx.GetLogger() logger.Info("collect iterations") - collectorWithState, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs) + apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs) if err != nil { return err } - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: int(data.Options.PageSize), Concurrency: 3, @@ -52,8 +52,8 @@ func CollectIterations(taskCtx plugin.SubTaskContext) errors.Error { query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page)) query.Set("limit", fmt.Sprintf("%v", reqData.Pager.Size)) query.Set("order", "created asc") - if collectorWithState.Since != nil { - query.Set("modified", fmt.Sprintf(">%s", collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02"))) + if apiCollector.GetSince() != nil { + query.Set("modified", fmt.Sprintf(">%s", apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02"))) } return query, nil }, @@ -69,7 +69,7 @@ func CollectIterations(taskCtx plugin.SubTaskContext) errors.Error { logger.Error(err, "collect iteration error") return err } - return collectorWithState.Execute() + return apiCollector.Execute() } var CollectIterationMeta = plugin.SubTaskMeta{ diff --git a/backend/plugins/tapd/tasks/story_bug_collector.go b/backend/plugins/tapd/tasks/story_bug_collector.go index 9d5d0ed0c..315498242 100644 --- a/backend/plugins/tapd/tasks/story_bug_collector.go +++ b/backend/plugins/tapd/tasks/story_bug_collector.go @@ -36,7 +36,7 @@ var _ plugin.SubTaskEntryPoint = CollectStoryBugs func CollectStoryBugs(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_STORY_BUG_TABLE) db := taskCtx.GetDal() - collectorWithState, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs) + apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs) if err != nil { return err } @@ -48,8 +48,8 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext) errors.Error { dal.From(&models.TapdStory{}), dal.Where("_tool_tapd_stories.connection_id = ? and _tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId, data.Options.WorkspaceId), } - if collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("modified > ?", *collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("modified > ?", *apiCollector.GetSince())) } cursor, err := db.Cursor(clauses...) if err != nil { @@ -59,7 +59,7 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext) errors.Error { if err != nil { return err } - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ ApiClient: data.ApiClient, Input: iterator, UrlTemplate: "stories/get_related_bugs", @@ -76,7 +76,7 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext) errors.Error { logger.Error(err, "collect storyBug error") return err } - return collectorWithState.Execute() + return apiCollector.Execute() } var CollectStoryBugMeta = plugin.SubTaskMeta{ diff --git a/backend/plugins/tapd/tasks/story_changelog_collector.go b/backend/plugins/tapd/tasks/story_changelog_collector.go index d0597b7ef..83d195e26 100644 --- a/backend/plugins/tapd/tasks/story_changelog_collector.go +++ b/backend/plugins/tapd/tasks/story_changelog_collector.go @@ -49,8 +49,8 @@ func CollectStoryChangelogs(taskCtx plugin.SubTaskContext) errors.Error { query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page)) query.Set("limit", fmt.Sprintf("%v", reqData.Pager.Size)) query.Set("order", "created asc") - if collectorWithState.Since != nil { - query.Set("created", fmt.Sprintf(">%s", collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02"))) + if collectorWithState.GetSince() != nil { + query.Set("created", fmt.Sprintf(">%s", collectorWithState.GetSince().In(data.Options.CstZone).Format("2006-01-02"))) } return query, nil }, diff --git a/backend/plugins/tapd/tasks/story_collector.go b/backend/plugins/tapd/tasks/story_collector.go index 095c72b3d..f030abbef 100644 --- a/backend/plugins/tapd/tasks/story_collector.go +++ b/backend/plugins/tapd/tasks/story_collector.go @@ -34,12 +34,12 @@ func CollectStorys(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_STORY_TABLE) logger := taskCtx.GetLogger() logger.Info("collect stories") - collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) + apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) if err != nil { return err } - err = collectorWithState.InitCollector(helper.ApiCollectorArgs{ + err = apiCollector.InitCollector(helper.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: int(data.Options.PageSize), UrlTemplate: "stories", @@ -50,8 +50,8 @@ func CollectStorys(taskCtx plugin.SubTaskContext) errors.Error { query.Set("limit", fmt.Sprintf("%v", reqData.Pager.Size)) query.Set("fields", "labels") query.Set("order", "created asc") - if collectorWithState.Since != nil { - query.Set("modified", fmt.Sprintf(">%s", collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02"))) + if apiCollector.GetSince() != nil { + query.Set("modified", fmt.Sprintf(">%s", apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02"))) } return query, nil }, @@ -61,7 +61,7 @@ func CollectStorys(taskCtx plugin.SubTaskContext) errors.Error { logger.Error(err, "collect story error") return err } - return collectorWithState.Execute() + return apiCollector.Execute() } var CollectStoryMeta = plugin.SubTaskMeta{ diff --git a/backend/plugins/tapd/tasks/story_commit_collector.go b/backend/plugins/tapd/tasks/story_commit_collector.go index 708c71010..21044f3be 100644 --- a/backend/plugins/tapd/tasks/story_commit_collector.go +++ b/backend/plugins/tapd/tasks/story_commit_collector.go @@ -38,7 +38,7 @@ var _ plugin.SubTaskEntryPoint = CollectStoryCommits func CollectStoryCommits(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_STORY_COMMIT_TABLE) db := taskCtx.GetDal() - collectorWithState, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs) + apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs) if err != nil { return err } @@ -49,8 +49,8 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) errors.Error { dal.From(&models.TapdStory{}), dal.Where("_tool_tapd_stories.connection_id = ? and _tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId, data.Options.WorkspaceId), } - if collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("modified > ?", *collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("modified > ?", *apiCollector.GetSince())) } cursor, err := db.Cursor(clauses...) if err != nil { @@ -60,7 +60,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) errors.Error { if err != nil { return err } - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ ApiClient: data.ApiClient, Input: iterator, UrlTemplate: "code_commit_infos", @@ -85,7 +85,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) errors.Error { logger.Error(err, "collect issueCommit error") return err } - return collectorWithState.Execute() + return apiCollector.Execute() } var CollectStoryCommitMeta = plugin.SubTaskMeta{ diff --git a/backend/plugins/tapd/tasks/task_changelog_collector.go b/backend/plugins/tapd/tasks/task_changelog_collector.go index 7aba00c2a..66aa60b9e 100644 --- a/backend/plugins/tapd/tasks/task_changelog_collector.go +++ b/backend/plugins/tapd/tasks/task_changelog_collector.go @@ -32,13 +32,13 @@ var _ plugin.SubTaskEntryPoint = CollectTaskChangelogs func CollectTaskChangelogs(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_TASK_CHANGELOG_TABLE) - collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) + apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) if err != nil { return err } logger := taskCtx.GetLogger() logger.Info("collect taskChangelogs") - err = collectorWithState.InitCollector(helper.ApiCollectorArgs{ + err = apiCollector.InitCollector(helper.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: int(data.Options.PageSize), UrlTemplate: "task_changes", @@ -48,8 +48,8 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext) errors.Error { query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page)) query.Set("limit", fmt.Sprintf("%v", reqData.Pager.Size)) query.Set("order", "created asc") - if collectorWithState.Since != nil { - query.Set("created", fmt.Sprintf(">%s", collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02"))) + if apiCollector.GetSince() != nil { + query.Set("created", fmt.Sprintf(">%s", apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02"))) } return query, nil }, @@ -59,7 +59,7 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext) errors.Error { logger.Error(err, "collect task changelog error") return err } - return collectorWithState.Execute() + return apiCollector.Execute() } var CollectTaskChangelogMeta = plugin.SubTaskMeta{ diff --git a/backend/plugins/tapd/tasks/task_collector.go b/backend/plugins/tapd/tasks/task_collector.go index 2d5bfdba3..9b5beb3a4 100644 --- a/backend/plugins/tapd/tasks/task_collector.go +++ b/backend/plugins/tapd/tasks/task_collector.go @@ -34,7 +34,7 @@ func CollectTasks(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_TASK_TABLE) logger := taskCtx.GetLogger() logger.Info("collect tasks") - collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) + apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) if err != nil { return err } @@ -51,8 +51,8 @@ func CollectTasks(taskCtx plugin.SubTaskContext) errors.Error { query.Set("limit", fmt.Sprintf("%v", reqData.Pager.Size)) query.Set("fields", "labels") query.Set("order", "created asc") - if collectorWithState.Since != nil { - query.Set("modified", fmt.Sprintf(">%s", collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02"))) + if apiCollector.GetSince() != nil { + query.Set("modified", fmt.Sprintf(">%s", apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02"))) } return query, nil }, diff --git a/backend/plugins/tapd/tasks/task_commit_collector.go b/backend/plugins/tapd/tasks/task_commit_collector.go index 0ef938997..56c374d3a 100644 --- a/backend/plugins/tapd/tasks/task_commit_collector.go +++ b/backend/plugins/tapd/tasks/task_commit_collector.go @@ -38,7 +38,7 @@ var _ plugin.SubTaskEntryPoint = CollectTaskCommits func CollectTaskCommits(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_TASK_COMMIT_TABLE) db := taskCtx.GetDal() - collectorWithState, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs) + apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs) if err != nil { return err } @@ -49,8 +49,8 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) errors.Error { dal.From(&models.TapdTask{}), dal.Where("_tool_tapd_tasks.connection_id = ? and _tool_tapd_tasks.workspace_id = ? ", data.Options.ConnectionId, data.Options.WorkspaceId), } - if collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("modified > ?", *collectorWithState.Since)) + if apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("modified > ?", *apiCollector.GetSince())) } cursor, err := db.Cursor(clauses...) if err != nil { @@ -60,7 +60,7 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) errors.Error { if err != nil { return err } - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ ApiClient: data.ApiClient, Input: iterator, UrlTemplate: "code_commit_infos", @@ -85,7 +85,7 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) errors.Error { logger.Error(err, "collect issueCommit error") return err } - return collectorWithState.Execute() + return apiCollector.Execute() } var CollectTaskCommitMeta = plugin.SubTaskMeta{ diff --git a/backend/plugins/tapd/tasks/worklog_collector.go b/backend/plugins/tapd/tasks/worklog_collector.go index b091f9e44..c651a2038 100644 --- a/backend/plugins/tapd/tasks/worklog_collector.go +++ b/backend/plugins/tapd/tasks/worklog_collector.go @@ -34,12 +34,12 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_WORKLOG_TABLE) logger := taskCtx.GetLogger() logger.Info("collect worklogs") - collectorWithState, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) + apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs) if err != nil { return err } - err = collectorWithState.InitCollector(helper.ApiCollectorArgs{ + err = apiCollector.InitCollector(helper.ApiCollectorArgs{ ApiClient: data.ApiClient, PageSize: int(data.Options.PageSize), UrlTemplate: "timesheets", @@ -49,8 +49,8 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) errors.Error { query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page)) query.Set("limit", fmt.Sprintf("%v", reqData.Pager.Size)) query.Set("order", "created asc") - if collectorWithState.Since != nil { - query.Set("modified", fmt.Sprintf(">%s", collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02"))) + if apiCollector.GetSince() != nil { + query.Set("modified", fmt.Sprintf(">%s", apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02"))) } return query, nil }, @@ -60,7 +60,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) errors.Error { logger.Error(err, "collect worklog error") return err } - return collectorWithState.Execute() + return apiCollector.Execute() } var CollectWorklogMeta = plugin.SubTaskMeta{ diff --git a/backend/plugins/zentao/tasks/bug_commits_collector.go b/backend/plugins/zentao/tasks/bug_commits_collector.go index c1ee2f74e..bf2a44116 100644 --- a/backend/plugins/zentao/tasks/bug_commits_collector.go +++ b/backend/plugins/zentao/tasks/bug_commits_collector.go @@ -52,7 +52,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*ZentaoTaskData) // state manager - collectorWithState, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ + apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ Ctx: taskCtx, Options: data.Options, Table: RAW_BUG_COMMITS_TABLE, @@ -70,8 +70,8 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) errors.Error { data.Options.ProjectId, data.Options.ConnectionId, ), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("last_edited_date is not null and last_edited_date > ?", collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("last_edited_date is not null and last_edited_date > ?", apiCollector.GetSince())) } cursor, err := db.Cursor(clauses...) if err != nil { @@ -82,7 +82,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) errors.Error { return err } // collect bug commits - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ RawDataSubTaskArgs: api.RawDataSubTaskArgs{ Ctx: taskCtx, Options: data.Options, @@ -111,7 +111,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } type SimpleZentaoBug struct { diff --git a/backend/plugins/zentao/tasks/story_commits_collector.go b/backend/plugins/zentao/tasks/story_commits_collector.go index 489ad2945..85404a39f 100644 --- a/backend/plugins/zentao/tasks/story_commits_collector.go +++ b/backend/plugins/zentao/tasks/story_commits_collector.go @@ -47,7 +47,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*ZentaoTaskData) // state manager - collectorWithState, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ + apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ Ctx: taskCtx, Options: data.Options, Table: RAW_STORY_COMMITS_TABLE, @@ -66,8 +66,8 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) errors.Error { dal.Where(`_tool_zentao_project_stories.project_id = ? and _tool_zentao_project_stories.connection_id = ?`, data.Options.ProjectId, data.Options.ConnectionId), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("last_edited_date is not null and last_edited_date > ?", collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("last_edited_date is not null and last_edited_date > ?", apiCollector.GetSince())) } cursor, err := db.Cursor(clauses...) if err != nil { @@ -80,7 +80,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) errors.Error { } // collect story commits - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ RawDataSubTaskArgs: api.RawDataSubTaskArgs{ Ctx: taskCtx, Options: data.Options, @@ -109,7 +109,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() } type inputWithLastEditedDate struct { diff --git a/backend/plugins/zentao/tasks/task_commits_collector.go b/backend/plugins/zentao/tasks/task_commits_collector.go index 5a3204302..219a500fd 100644 --- a/backend/plugins/zentao/tasks/task_commits_collector.go +++ b/backend/plugins/zentao/tasks/task_commits_collector.go @@ -46,7 +46,7 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*ZentaoTaskData) // state manager - collectorWithState, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ + apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{ Ctx: taskCtx, Options: data.Options, Table: RAW_TASK_COMMITS_TABLE, @@ -64,8 +64,8 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) errors.Error { data.Options.ProjectId, data.Options.ConnectionId, ), } - if collectorWithState.IsIncremental && collectorWithState.Since != nil { - clauses = append(clauses, dal.Where("last_edited_date is not null and last_edited_date > ?", collectorWithState.Since)) + if apiCollector.IsIncremental() && apiCollector.GetSince() != nil { + clauses = append(clauses, dal.Where("last_edited_date is not null and last_edited_date > ?", apiCollector.GetSince())) } cursor, err := db.Cursor(clauses...) if err != nil { @@ -78,7 +78,7 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) errors.Error { } // collect task commits - err = collectorWithState.InitCollector(api.ApiCollectorArgs{ + err = apiCollector.InitCollector(api.ApiCollectorArgs{ RawDataSubTaskArgs: api.RawDataSubTaskArgs{ Ctx: taskCtx, Options: data.Options, @@ -107,5 +107,5 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) errors.Error { return err } - return collectorWithState.Execute() + return apiCollector.Execute() }
