This is an automated email from the ASF dual-hosted git repository.

likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new bf25f9be6 feat(tapd): add time filter (#4075)
bf25f9be6 is described below

commit bf25f9be63511b5e0713ff1c25568849cb5d8534
Author: Warren Chen <[email protected]>
AuthorDate: Fri Dec 30 15:29:38 2022 +0800

    feat(tapd): add time filter (#4075)
---
 plugins/github/github.go                           |  4 +-
 plugins/github/tasks/cicd_run_collector.go         |  2 +-
 plugins/github/tasks/comment_collector.go          |  2 +-
 plugins/github/tasks/commit_collector.go           |  2 +-
 plugins/github/tasks/issue_collector.go            |  2 +-
 plugins/github/tasks/pr_commit_collector.go        |  2 +-
 plugins/github/tasks/pr_review_collector.go        |  2 +-
 .../github/tasks/pr_review_comment_collector.go    |  2 +-
 .../github_graphql/tasks/check_run_collector.go    |  2 +-
 plugins/gitlab/tasks/issue_collector.go            |  2 +-
 plugins/gitlab/tasks/mr_collector.go               |  2 +-
 plugins/gitlab/tasks/pipeline_collector.go         |  2 +-
 plugins/helper/api_collector_with_state.go         |  4 +-
 plugins/jira/tasks/issue_changelog_collector.go    |  2 +-
 plugins/jira/tasks/issue_collector.go              |  2 +-
 plugins/jira/tasks/remotelink_collector.go         |  2 +-
 plugins/jira/tasks/worklog_collector.go            |  2 +-
 plugins/tapd/impl/impl.go                          | 23 ++++-----
 plugins/tapd/tapd.go                               |  8 ++--
 plugins/tapd/tasks/bug_changelog_collector.go      | 54 +++++++++------------
 plugins/tapd/tasks/bug_collector.go                | 52 ++++++++------------
 plugins/tapd/tasks/bug_commit_collector.go         | 28 ++++++-----
 plugins/tapd/tasks/iteration_collector.go          | 55 +++++++++-------------
 plugins/tapd/tasks/story_bug_collector.go          | 27 +++++++----
 plugins/tapd/tasks/story_changelog_collector.go    | 54 +++++++++------------
 plugins/tapd/tasks/story_collector.go              | 52 ++++++++------------
 plugins/tapd/tasks/story_commit_collector.go       | 27 +++++++----
 plugins/tapd/tasks/task_changelog_collector.go     | 54 +++++++++------------
 plugins/tapd/tasks/task_collector.go               | 41 ++++++----------
 plugins/tapd/tasks/task_commit_collector.go        | 23 +++++----
 plugins/tapd/tasks/task_data.go                    | 10 ++--
 plugins/tapd/tasks/worklog_collector.go            | 53 +++++++++------------
 32 files changed, 267 insertions(+), 332 deletions(-)

diff --git a/plugins/github/github.go b/plugins/github/github.go
index 6ee06b22c..9368e7ccc 100644
--- a/plugins/github/github.go
+++ b/plugins/github/github.go
@@ -32,7 +32,7 @@ func main() {
        connectionId := cmd.Flags().Uint64P("connectionId", "c", 0, "github 
connection id")
        owner := cmd.Flags().StringP("owner", "o", "", "github owner")
        repo := cmd.Flags().StringP("repo", "r", "", "github repo")
-       CreatedDateAfter := cmd.Flags().StringP("createdDateAfter", "a", "", 
"collect data that are created after specified time, ie 2006-05-06T07:08:09Z")
+       createdDateAfter := cmd.Flags().StringP("createdDateAfter", "a", "", 
"collect data that are created after specified time, ie 2006-05-06T07:08:09Z")
        _ = cmd.MarkFlagRequired("connectionId")
        _ = cmd.MarkFlagRequired("owner")
        _ = cmd.MarkFlagRequired("repo")
@@ -53,7 +53,7 @@ func main() {
                        "connectionId":     *connectionId,
                        "owner":            *owner,
                        "repo":             *repo,
-                       "createdDateAfter": *CreatedDateAfter,
+                       "createdDateAfter": *createdDateAfter,
                        "transformationRules": map[string]interface{}{
                                "prType":               *prType,
                                "prComponent":          *prComponent,
diff --git a/plugins/github/tasks/cicd_run_collector.go 
b/plugins/github/tasks/cicd_run_collector.go
index 40440298d..ff0ea7cb2 100644
--- a/plugins/github/tasks/cicd_run_collector.go
+++ b/plugins/github/tasks/cicd_run_collector.go
@@ -51,7 +51,7 @@ func CollectRuns(taskCtx core.SubTaskContext) errors.Error {
                return err
        }
 
-       //incremental := collectorWithState.CanIncrementCollect()
+       //incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient: data.ApiClient,
                PageSize:  30,
diff --git a/plugins/github/tasks/comment_collector.go 
b/plugins/github/tasks/comment_collector.go
index b295999a6..5d949747b 100644
--- a/plugins/github/tasks/comment_collector.go
+++ b/plugins/github/tasks/comment_collector.go
@@ -44,7 +44,7 @@ func CollectApiComments(taskCtx core.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.CanIncrementCollect()
+       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
diff --git a/plugins/github/tasks/commit_collector.go 
b/plugins/github/tasks/commit_collector.go
index 8fb8e7685..b223140fd 100644
--- a/plugins/github/tasks/commit_collector.go
+++ b/plugins/github/tasks/commit_collector.go
@@ -52,7 +52,7 @@ func CollectApiCommits(taskCtx core.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.CanIncrementCollect()
+       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
diff --git a/plugins/github/tasks/issue_collector.go 
b/plugins/github/tasks/issue_collector.go
index d668a8960..4478cb1d5 100644
--- a/plugins/github/tasks/issue_collector.go
+++ b/plugins/github/tasks/issue_collector.go
@@ -52,7 +52,7 @@ func CollectApiIssues(taskCtx core.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.CanIncrementCollect()
+       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
diff --git a/plugins/github/tasks/pr_commit_collector.go 
b/plugins/github/tasks/pr_commit_collector.go
index 5e62d431f..cb993071e 100644
--- a/plugins/github/tasks/pr_commit_collector.go
+++ b/plugins/github/tasks/pr_commit_collector.go
@@ -66,7 +66,7 @@ func CollectApiPullRequestCommits(taskCtx 
core.SubTaskContext) errors.Error {
                return err
        }
 
-       incremental := collectorWithState.CanIncrementCollect()
+       incremental := collectorWithState.IsIncremental()
 
        clauses := []dal.Clause{
                dal.Select("number, github_id"),
diff --git a/plugins/github/tasks/pr_review_collector.go 
b/plugins/github/tasks/pr_review_collector.go
index 7d2d6f2eb..bf1e823b7 100644
--- a/plugins/github/tasks/pr_review_collector.go
+++ b/plugins/github/tasks/pr_review_collector.go
@@ -59,7 +59,7 @@ func CollectApiPullRequestReviews(taskCtx 
core.SubTaskContext) errors.Error {
                return err
        }
 
-       incremental := collectorWithState.CanIncrementCollect()
+       incremental := collectorWithState.IsIncremental()
        clauses := []dal.Clause{
                dal.Select("number, github_id"),
                dal.From(models.GithubPullRequest{}.TableName()),
diff --git a/plugins/github/tasks/pr_review_comment_collector.go 
b/plugins/github/tasks/pr_review_comment_collector.go
index 7d9e1a6b6..2f4829102 100644
--- a/plugins/github/tasks/pr_review_comment_collector.go
+++ b/plugins/github/tasks/pr_review_comment_collector.go
@@ -47,7 +47,7 @@ func CollectPrReviewComments(taskCtx core.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.CanIncrementCollect()
+       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
diff --git a/plugins/github_graphql/tasks/check_run_collector.go 
b/plugins/github_graphql/tasks/check_run_collector.go
index 2af9b0aac..68f5ab752 100644
--- a/plugins/github_graphql/tasks/check_run_collector.go
+++ b/plugins/github_graphql/tasks/check_run_collector.go
@@ -112,7 +112,7 @@ func CollectCheckRun(taskCtx core.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.CanIncrementCollect()
+       incremental := collectorWithState.IsIncremental()
 
        clauses := []dal.Clause{
                dal.Select("check_suite_node_id"),
diff --git a/plugins/gitlab/tasks/issue_collector.go 
b/plugins/gitlab/tasks/issue_collector.go
index 5592dfe72..df8e4c01d 100644
--- a/plugins/gitlab/tasks/issue_collector.go
+++ b/plugins/gitlab/tasks/issue_collector.go
@@ -45,7 +45,7 @@ func CollectApiIssues(taskCtx core.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.CanIncrementCollect()
+       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
diff --git a/plugins/gitlab/tasks/mr_collector.go 
b/plugins/gitlab/tasks/mr_collector.go
index e4dddcf8d..6052ae132 100644
--- a/plugins/gitlab/tasks/mr_collector.go
+++ b/plugins/gitlab/tasks/mr_collector.go
@@ -42,7 +42,7 @@ func CollectApiMergeRequests(taskCtx core.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.CanIncrementCollect()
+       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:      data.ApiClient,
                PageSize:       100,
diff --git a/plugins/gitlab/tasks/pipeline_collector.go 
b/plugins/gitlab/tasks/pipeline_collector.go
index d47a27992..a9f3a0a28 100644
--- a/plugins/gitlab/tasks/pipeline_collector.go
+++ b/plugins/gitlab/tasks/pipeline_collector.go
@@ -42,7 +42,7 @@ func CollectApiPipelines(taskCtx core.SubTaskContext) 
errors.Error {
                return err
        }
 
-       incremental := collectorWithState.CanIncrementCollect()
+       incremental := collectorWithState.IsIncremental()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                RawDataSubTaskArgs: *rawDataSubTaskArgs,
                ApiClient:          data.ApiClient,
diff --git a/plugins/helper/api_collector_with_state.go 
b/plugins/helper/api_collector_with_state.go
index 2e2064b74..c8a7a648a 100644
--- a/plugins/helper/api_collector_with_state.go
+++ b/plugins/helper/api_collector_with_state.go
@@ -63,12 +63,12 @@ func NewApiCollectorWithState(args RawDataSubTaskArgs, 
createdDateAfter *time.Ti
        }, nil
 }
 
-// CanIncrementCollect return if the old data can support collect 
incrementally.
+// IsIncremental return if the old data can support collect incrementally.
 // only when latest collection is success &&
 // (m.LatestState.CreatedDateAfter == nil means all data have been collected ||
 // CreatedDateAfter at this time exists and no before than in the LatestState)
 // if CreatedDateAfter at this time not exists, collect incrementally only 
when "m.LatestState.CreatedDateAfter == nil"
-func (m ApiCollectorStateManager) CanIncrementCollect() bool {
+func (m ApiCollectorStateManager) IsIncremental() bool {
        return m.LatestState.LatestSuccessStart != nil &&
                (m.LatestState.CreatedDateAfter == nil || m.CreatedDateAfter != 
nil && !m.CreatedDateAfter.Before(*m.LatestState.CreatedDateAfter))
 }
diff --git a/plugins/jira/tasks/issue_changelog_collector.go 
b/plugins/jira/tasks/issue_changelog_collector.go
index 30c47cdfe..4bf46f4e9 100644
--- a/plugins/jira/tasks/issue_changelog_collector.go
+++ b/plugins/jira/tasks/issue_changelog_collector.go
@@ -73,7 +73,7 @@ func CollectIssueChangelogs(taskCtx core.SubTaskContext) 
errors.Error {
                dal.Where("i.updated > i.created AND bi.connection_id = ?  AND 
bi.board_id = ? AND i.std_type != ? ", data.Options.ConnectionId, 
data.Options.BoardId, "Epic"),
                dal.Groupby("i.issue_id, i.updated"),
        }
-       incremental := collectorWithState.CanIncrementCollect()
+       incremental := collectorWithState.IsIncremental()
        if incremental {
                clauses = append(clauses, dal.Having("i.updated > 
max(c.issue_updated) OR  (max(c.issue_updated) IS NULL AND 
COUNT(c.changelog_id) > 0)"))
        }
diff --git a/plugins/jira/tasks/issue_collector.go 
b/plugins/jira/tasks/issue_collector.go
index 50a8504f8..91ebee2a1 100644
--- a/plugins/jira/tasks/issue_collector.go
+++ b/plugins/jira/tasks/issue_collector.go
@@ -76,7 +76,7 @@ func CollectIssues(taskCtx core.SubTaskContext) errors.Error {
                jql = fmt.Sprintf("created >= '%v' %v", 
createdDateAfter.Format("2006/01/02 15:04"), jql)
        }
 
-       incremental := collectorWithState.CanIncrementCollect()
+       incremental := collectorWithState.IsIncremental()
        if incremental {
                // user didn't specify a time range to sync, try load from 
database
                var latestUpdated models.JiraIssue
diff --git a/plugins/jira/tasks/remotelink_collector.go 
b/plugins/jira/tasks/remotelink_collector.go
index 4867225ad..504ad509b 100644
--- a/plugins/jira/tasks/remotelink_collector.go
+++ b/plugins/jira/tasks/remotelink_collector.go
@@ -67,7 +67,7 @@ func CollectRemotelinks(taskCtx core.SubTaskContext) 
errors.Error {
                dal.Where("i.updated > i.created AND bi.connection_id = ?  AND 
bi.board_id = ?  ", data.Options.ConnectionId, data.Options.BoardId),
                dal.Groupby("i.issue_id, i.updated"),
        }
-       incremental := collectorWithState.CanIncrementCollect()
+       incremental := collectorWithState.IsIncremental()
        if incremental {
                if collectorWithState.LatestState.LatestSuccessStart != nil {
                        clauses = append(clauses, dal.Having("i.updated > ? AND 
(i.updated > max(rl.issue_updated) OR max(rl.issue_updated) IS NULL)", 
collectorWithState.LatestState.LatestSuccessStart))
diff --git a/plugins/jira/tasks/worklog_collector.go 
b/plugins/jira/tasks/worklog_collector.go
index 3958a81a4..fc3d84392 100644
--- a/plugins/jira/tasks/worklog_collector.go
+++ b/plugins/jira/tasks/worklog_collector.go
@@ -66,7 +66,7 @@ func CollectWorklogs(taskCtx core.SubTaskContext) 
errors.Error {
                dal.Where("i.updated > i.created AND bi.connection_id = ?  AND 
bi.board_id = ?  ", data.Options.ConnectionId, data.Options.BoardId),
                dal.Groupby("i.issue_id, i.updated"),
        }
-       incremental := collectorWithState.CanIncrementCollect()
+       incremental := collectorWithState.IsIncremental()
        if incremental {
                clauses = append(clauses, dal.Having("i.updated > 
max(wl.issue_updated) OR  (max(wl.issue_updated) IS NULL AND 
COUNT(wl.worklog_id) > 0)"))
        }
diff --git a/plugins/tapd/impl/impl.go b/plugins/tapd/impl/impl.go
index ebfdc7dba..2f977dba6 100644
--- a/plugins/tapd/impl/impl.go
+++ b/plugins/tapd/impl/impl.go
@@ -163,6 +163,7 @@ func (plugin Tapd) SubTaskMetas() []core.SubTaskMeta {
 }
 
 func (plugin Tapd) PrepareTaskData(taskCtx core.TaskContext, options 
map[string]interface{}) (interface{}, errors.Error) {
+       logger := taskCtx.GetLogger()
        var op tasks.TapdOptions
        err := helper.Decode(options, &op, nil)
        if err != nil {
@@ -180,16 +181,8 @@ func (plugin Tapd) PrepareTaskData(taskCtx 
core.TaskContext, options map[string]
        if err != nil {
                return nil, err
        }
-
-       var since time.Time
-       if op.Since != "" {
-               since, err = errors.Convert01(time.Parse(time.RFC3339, 
op.Since))
-               if err != nil {
-                       return nil, errors.BadInput.Wrap(err, "invalid value 
for `since`")
-               }
-       }
        if connection.RateLimitPerHour == 0 {
-               connection.RateLimitPerHour = 6000
+               connection.RateLimitPerHour = 3600
        }
        tapdApiClient, err := tasks.NewTapdApiClient(taskCtx, connection)
        if err != nil {
@@ -205,8 +198,16 @@ func (plugin Tapd) PrepareTaskData(taskCtx 
core.TaskContext, options map[string]
                ApiClient:  tapdApiClient,
                Connection: connection,
        }
-       if !since.IsZero() {
-               taskData.Since = &since
+       var createdDateAfter time.Time
+       if op.CreatedDateAfter != "" {
+               createdDateAfter, err = 
errors.Convert01(time.Parse(time.RFC3339, op.CreatedDateAfter))
+               if err != nil {
+                       return nil, errors.BadInput.Wrap(err, "invalid value 
for `createdDateAfter`")
+               }
+       }
+       if !createdDateAfter.IsZero() {
+               taskData.CreatedDateAfter = &createdDateAfter
+               logger.Debug("collect data updated createdDateAfter %s", 
createdDateAfter)
        }
        return taskData, nil
 }
diff --git a/plugins/tapd/tapd.go b/plugins/tapd/tapd.go
index b741ffd25..6c95a5eed 100644
--- a/plugins/tapd/tapd.go
+++ b/plugins/tapd/tapd.go
@@ -31,6 +31,7 @@ func main() {
        connectionId := cmd.Flags().Uint64P("connection", "c", 0, "tapd 
connection id")
        workspaceId := cmd.Flags().Uint64P("workspace", "w", 0, "tapd workspace 
id")
        companyId := cmd.Flags().Uint64P("company", "o", 0, "tapd company id")
+       createdDateAfter := cmd.Flags().StringP("createdDateAfter", "a", "", 
"collect data that are created after specified time, ie 2006-05-06T07:08:09Z")
        err := cmd.MarkFlagRequired("connection")
        if err != nil {
                panic(err)
@@ -42,9 +43,10 @@ func main() {
 
        cmd.Run = func(c *cobra.Command, args []string) {
                runner.DirectRun(c, args, PluginEntry, map[string]interface{}{
-                       "connectionId": *connectionId,
-                       "workspaceId":  *workspaceId,
-                       "companyId":    *companyId,
+                       "connectionId":     *connectionId,
+                       "workspaceId":      *workspaceId,
+                       "companyId":        *companyId,
+                       "createdDateAfter": *createdDateAfter,
                })
                //
                // cfg := config.GetConfig()
diff --git a/plugins/tapd/tasks/bug_changelog_collector.go 
b/plugins/tapd/tasks/bug_changelog_collector.go
index 6b395c297..20c242c66 100644
--- a/plugins/tapd/tasks/bug_changelog_collector.go
+++ b/plugins/tapd/tasks/bug_changelog_collector.go
@@ -19,15 +19,11 @@ package tasks
 
 import (
        "fmt"
-       "net/url"
-       "time"
-
        "github.com/apache/incubator-devlake/errors"
+       "net/url"
 
        "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/apache/incubator-devlake/plugins/helper"
-       "github.com/apache/incubator-devlake/plugins/tapd/models"
 )
 
 const RAW_BUG_CHANGELOG_TABLE = "tapd_api_bug_changelogs"
@@ -36,42 +32,34 @@ var _ core.SubTaskEntryPoint = CollectBugChangelogs
 
 func CollectBugChangelogs(taskCtx core.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_BUG_CHANGELOG_TABLE, false)
-       db := taskCtx.GetDal()
        logger := taskCtx.GetLogger()
        logger.Info("collect storyChangelogs")
-       since := data.Since
-       incremental := false
-       if since == nil {
-               // user didn't specify a time range to sync, try load from 
database
-               var latestUpdated models.TapdBugChangelog
-               clauses := []dal.Clause{
-                       dal.Where("connection_id = ? and workspace_id = ?", 
data.Options.ConnectionId, data.Options.WorkspaceId),
-                       dal.Orderby("created DESC"),
-               }
-               err := db.First(&latestUpdated, clauses...)
-               if err != nil && !db.IsErrorNotFound(err) {
-                       return errors.NotFound.Wrap(err, "failed to get latest 
tapd changelog record")
-               }
-               if latestUpdated.Id > 0 {
-                       since = (*time.Time)(latestUpdated.Created)
-                       incremental = true
-               }
+       collectorWithState, err := 
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+       if err != nil {
+               return err
        }
+       incremental := collectorWithState.IsIncremental()
 
-       collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
-               RawDataSubTaskArgs: *rawDataSubTaskArgs,
-               Incremental:        incremental,
-               ApiClient:          data.ApiClient,
-               PageSize:           100,
-               UrlTemplate:        "bug_changes",
+       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+               Incremental: incremental,
+               ApiClient:   data.ApiClient,
+               PageSize:    100,
+               UrlTemplate: "bug_changes",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("workspace_id", fmt.Sprintf("%v", 
data.Options.WorkspaceId))
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
-                       query.Set("order", "created asc")
-                       if since != nil {
-                               query.Set("created", fmt.Sprintf(">%s", 
since.In(data.Options.CstZone).Format("2006-01-02")))
+                       query.Set("order", "created%20desc")
+                       if data.CreatedDateAfter != nil {
+                               query.Set("created",
+                                       fmt.Sprintf(">%s",
+                                               
data.CreatedDateAfter.In(data.Options.CstZone).Format("2006-01-02")))
+                       }
+                       if incremental {
+                               query.Set("created",
+                                       fmt.Sprintf(">%s",
+                                               
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
@@ -81,7 +69,7 @@ func CollectBugChangelogs(taskCtx core.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect story changelog error")
                return err
        }
-       return collector.Execute()
+       return collectorWithState.Execute()
 }
 
 var CollectBugChangelogMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/bug_collector.go 
b/plugins/tapd/tasks/bug_collector.go
index 570d2824c..796878b82 100644
--- a/plugins/tapd/tasks/bug_collector.go
+++ b/plugins/tapd/tasks/bug_collector.go
@@ -19,15 +19,11 @@ package tasks
 
 import (
        "fmt"
-       "net/url"
-       "time"
-
        "github.com/apache/incubator-devlake/errors"
+       "net/url"
 
        "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/apache/incubator-devlake/plugins/helper"
-       "github.com/apache/incubator-devlake/plugins/tapd/models"
 )
 
 const RAW_BUG_TABLE = "tapd_api_bugs"
@@ -36,34 +32,19 @@ var _ core.SubTaskEntryPoint = CollectBugs
 
 func CollectBugs(taskCtx core.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_BUG_TABLE, false)
-       db := taskCtx.GetDal()
        logger := taskCtx.GetLogger()
        logger.Info("collect bugs")
-       since := data.Since
-       incremental := false
-       if since == nil {
-               // user didn't specify a time range to sync, try load from 
database
-               var latestUpdated models.TapdBug
-               clauses := []dal.Clause{
-                       dal.Where("connection_id = ? and workspace_id = ?", 
data.Options.ConnectionId, data.Options.WorkspaceId),
-                       dal.Orderby("modified DESC"),
-               }
-               err := db.First(&latestUpdated, clauses...)
-               if err != nil && !db.IsErrorNotFound(err) {
-                       return errors.Default.Wrap(err, "failed to get latest 
tapd changelog record")
-               }
-               if latestUpdated.Id > 0 {
-                       since = (*time.Time)(latestUpdated.Modified)
-                       incremental = true
-               }
+       collectorWithState, err := 
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+       if err != nil {
+               return err
        }
+       incremental := collectorWithState.IsIncremental()
 
-       collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
-               RawDataSubTaskArgs: *rawDataSubTaskArgs,
-               Incremental:        incremental,
-               ApiClient:          data.ApiClient,
-               PageSize:           100,
-               UrlTemplate:        "bugs",
+       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+               Incremental: incremental,
+               ApiClient:   data.ApiClient,
+               PageSize:    100,
+               UrlTemplate: "bugs",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("workspace_id", fmt.Sprintf("%v", 
data.Options.WorkspaceId))
@@ -71,8 +52,15 @@ func CollectBugs(taskCtx core.SubTaskContext) errors.Error {
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("fields", "labels")
                        query.Set("order", "created asc")
-                       if since != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
fmt.Sprintf(">%s", since.In(data.Options.CstZone).Format("2006-01-02"))))
+                       if data.CreatedDateAfter != nil {
+                               query.Set("created",
+                                       fmt.Sprintf(">%s",
+                                               
data.CreatedDateAfter.In(data.Options.CstZone).Format("2006-01-02")))
+                       }
+                       if incremental {
+                               query.Set("modified",
+                                       fmt.Sprintf(">%s",
+                                               
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
@@ -82,7 +70,7 @@ func CollectBugs(taskCtx core.SubTaskContext) errors.Error {
                logger.Error(err, "collect bug error")
                return err
        }
-       return collector.Execute()
+       return collectorWithState.Execute()
 }
 
 var CollectBugMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/bug_commit_collector.go 
b/plugins/tapd/tasks/bug_commit_collector.go
index b8a2b5fdc..928ec36f3 100644
--- a/plugins/tapd/tasks/bug_commit_collector.go
+++ b/plugins/tapd/tasks/bug_commit_collector.go
@@ -37,16 +37,23 @@ var _ core.SubTaskEntryPoint = CollectBugCommits
 func CollectBugCommits(taskCtx core.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_BUG_COMMIT_TABLE, false)
        db := taskCtx.GetDal()
+       collectorWithState, err := 
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+       if err != nil {
+               return err
+       }
        logger := taskCtx.GetLogger()
        logger.Info("collect issueCommits")
-       since := data.Since
+       incremental := collectorWithState.IsIncremental()
        clauses := []dal.Clause{
                dal.Select("_tool_tapd_bugs.id as issue_id, modified as 
update_time"),
                dal.From(&models.TapdBug{}),
                dal.Where("_tool_tapd_bugs.connection_id = ? and 
_tool_tapd_bugs.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if since != nil {
-               clauses = append(clauses, dal.Where("modified > ?", since))
+       if collectorWithState.CreatedDateAfter != nil {
+               clauses = append(clauses, dal.Where("created > ?", 
*collectorWithState.CreatedDateAfter))
+       }
+       if incremental {
+               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -57,13 +64,12 @@ func CollectBugCommits(taskCtx core.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
-               RawDataSubTaskArgs: *rawDataSubTaskArgs,
-               Incremental:        since != nil,
-               ApiClient:          data.ApiClient,
-               PageSize:           100,
-               Input:              iterator,
-               UrlTemplate:        "code_commit_infos",
+       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+               ApiClient:   data.ApiClient,
+               PageSize:    100,
+               Incremental: incremental,
+               Input:       iterator,
+               UrlTemplate: "code_commit_infos",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        input := reqData.Input.(*models.Input)
                        query := url.Values{}
@@ -85,7 +91,7 @@ func CollectBugCommits(taskCtx core.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect issueCommit error")
                return err
        }
-       return collector.Execute()
+       return collectorWithState.Execute()
 }
 
 var CollectBugCommitMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/iteration_collector.go 
b/plugins/tapd/tasks/iteration_collector.go
index a3b379be3..beae5fe95 100644
--- a/plugins/tapd/tasks/iteration_collector.go
+++ b/plugins/tapd/tasks/iteration_collector.go
@@ -20,16 +20,12 @@ package tasks
 import (
        "encoding/json"
        "fmt"
+       "github.com/apache/incubator-devlake/errors"
        "net/http"
        "net/url"
-       "time"
-
-       "github.com/apache/incubator-devlake/errors"
 
        "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/apache/incubator-devlake/plugins/helper"
-       "github.com/apache/incubator-devlake/plugins/tapd/models"
 )
 
 const RAW_ITERATION_TABLE = "tapd_api_iterations"
@@ -38,42 +34,35 @@ var _ core.SubTaskEntryPoint = CollectIterations
 
 func CollectIterations(taskCtx core.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_ITERATION_TABLE, false)
-       db := taskCtx.GetDal()
        logger := taskCtx.GetLogger()
        logger.Info("collect iterations")
-       since := data.Since
-       incremental := false
-       if since == nil {
-               // user didn't specify a time range to sync, try load from 
database
-               var latestUpdated models.TapdIteration
-               clauses := []dal.Clause{
-                       dal.Where("connection_id = ? and workspace_id = ?", 
data.Options.ConnectionId, data.Options.WorkspaceId),
-                       dal.Orderby("modified DESC"),
-               }
-               err := db.First(&latestUpdated, clauses...)
-               if err != nil && !db.IsErrorNotFound(err) {
-                       return errors.NotFound.Wrap(err, "failed to get latest 
tapd changelog record")
-               }
-               if latestUpdated.Id > 0 {
-                       since = (*time.Time)(latestUpdated.Modified)
-                       incremental = true
-               }
+       collectorWithState, err := 
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+       if err != nil {
+               return err
        }
-       collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
-               RawDataSubTaskArgs: *rawDataSubTaskArgs,
-               Incremental:        incremental,
-               ApiClient:          data.ApiClient,
-               PageSize:           100,
-               Concurrency:        3,
-               UrlTemplate:        "iterations",
+       incremental := collectorWithState.IsIncremental()
+
+       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+               Incremental: incremental,
+               ApiClient:   data.ApiClient,
+               PageSize:    100,
+               Concurrency: 3,
+               UrlTemplate: "iterations",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("workspace_id", fmt.Sprintf("%v", 
data.Options.WorkspaceId))
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if since != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
since.In(data.Options.CstZone).Format("2006-01-02")))
+                       if data.CreatedDateAfter != nil {
+                               query.Set("created",
+                                       fmt.Sprintf(">%s",
+                                               
data.CreatedDateAfter.In(data.Options.CstZone).Format("2006-01-02")))
+                       }
+                       if incremental {
+                               query.Set("modified",
+                                       fmt.Sprintf(">%s",
+                                               
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
@@ -89,7 +78,7 @@ func CollectIterations(taskCtx core.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect iteration error")
                return err
        }
-       return collector.Execute()
+       return collectorWithState.Execute()
 }
 
 var CollectIterationMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/story_bug_collector.go 
b/plugins/tapd/tasks/story_bug_collector.go
index 02181fd28..e5b266633 100644
--- a/plugins/tapd/tasks/story_bug_collector.go
+++ b/plugins/tapd/tasks/story_bug_collector.go
@@ -35,16 +35,23 @@ var _ core.SubTaskEntryPoint = CollectStoryBugs
 func CollectStoryBugs(taskCtx core.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_STORY_BUG_TABLE, false)
        db := taskCtx.GetDal()
+       collectorWithState, err := 
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+       if err != nil {
+               return err
+       }
        logger := taskCtx.GetLogger()
        logger.Info("collect storyBugs")
-       since := data.Since
+       incremental := collectorWithState.IsIncremental()
        clauses := []dal.Clause{
                dal.Select("id as issue_id, modified as update_time"),
                dal.From(&models.TapdStory{}),
                dal.Where("_tool_tapd_stories.connection_id = ? and 
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if since != nil {
-               clauses = append(clauses, dal.Where("modified > ?", since))
+       if collectorWithState.CreatedDateAfter != nil {
+               clauses = append(clauses, dal.Where("created > ?", 
*collectorWithState.CreatedDateAfter))
+       }
+       if incremental {
+               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -54,12 +61,12 @@ func CollectStoryBugs(taskCtx core.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
-               RawDataSubTaskArgs: *rawDataSubTaskArgs,
-               ApiClient:          data.ApiClient,
-               Incremental:        since != nil,
-               Input:              iterator,
-               UrlTemplate:        "stories/get_related_bugs",
+       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+               ApiClient:   data.ApiClient,
+               PageSize:    100,
+               Incremental: incremental,
+               Input:       iterator,
+               UrlTemplate: "stories/get_related_bugs",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        input := reqData.Input.(*models.Input)
                        query := url.Values{}
@@ -73,7 +80,7 @@ func CollectStoryBugs(taskCtx core.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect storyBug error")
                return err
        }
-       return collector.Execute()
+       return collectorWithState.Execute()
 }
 
 var CollectStoryBugMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/story_changelog_collector.go 
b/plugins/tapd/tasks/story_changelog_collector.go
index 9de0e97bb..e88e2e49d 100644
--- a/plugins/tapd/tasks/story_changelog_collector.go
+++ b/plugins/tapd/tasks/story_changelog_collector.go
@@ -19,14 +19,10 @@ package tasks
 
 import (
        "fmt"
-       "net/url"
-       "time"
-
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/apache/incubator-devlake/plugins/helper"
-       "github.com/apache/incubator-devlake/plugins/tapd/models"
+       "net/url"
 )
 
 const RAW_STORY_CHANGELOG_TABLE = "tapd_api_story_changelogs"
@@ -35,42 +31,34 @@ var _ core.SubTaskEntryPoint = CollectStoryChangelogs
 
 func CollectStoryChangelogs(taskCtx core.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_STORY_CHANGELOG_TABLE, false)
-       db := taskCtx.GetDal()
+       collectorWithState, err := 
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+       if err != nil {
+               return err
+       }
        logger := taskCtx.GetLogger()
        logger.Info("collect storyChangelogs")
-       since := data.Since
-       incremental := false
-       if since == nil {
-               // user didn't specify a time range to sync, try load from 
database
-               var latestUpdated models.TapdStoryChangelog
-               clauses := []dal.Clause{
-                       dal.Where("connection_id = ? and workspace_id = ?", 
data.Options.ConnectionId, data.Options.WorkspaceId),
-                       dal.Orderby("created DESC"),
-               }
-               err := db.First(&latestUpdated, clauses...)
-               if err != nil && !db.IsErrorNotFound(err) {
-                       return errors.NotFound.Wrap(err, "failed to get latest 
tapd changelog record")
-               }
-               if latestUpdated.Id > 0 {
-                       since = (*time.Time)(latestUpdated.Created)
-                       incremental = true
-               }
-       }
+       incremental := collectorWithState.IsIncremental()
 
-       collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
-               RawDataSubTaskArgs: *rawDataSubTaskArgs,
-               Incremental:        incremental,
-               ApiClient:          data.ApiClient,
-               PageSize:           100,
-               UrlTemplate:        "story_changes",
+       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+               Incremental: incremental,
+               ApiClient:   data.ApiClient,
+               PageSize:    100,
+               UrlTemplate: "story_changes",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("workspace_id", fmt.Sprintf("%v", 
data.Options.WorkspaceId))
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if since != nil {
-                               query.Set("created", fmt.Sprintf(">%s", 
since.In(data.Options.CstZone).Format("2006-01-02")))
+                       if data.CreatedDateAfter != nil {
+                               query.Set("created",
+                                       fmt.Sprintf(">%s",
+                                               
data.CreatedDateAfter.In(data.Options.CstZone).Format("2006-01-02")))
+                       }
+                       if incremental {
+                               query.Set("created",
+                                       fmt.Sprintf(">%s",
+                                               
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
@@ -80,7 +68,7 @@ func CollectStoryChangelogs(taskCtx core.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect story changelog error")
                return err
        }
-       return collector.Execute()
+       return collectorWithState.Execute()
 }
 
 var CollectStoryChangelogMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/story_collector.go 
b/plugins/tapd/tasks/story_collector.go
index 887f4671a..7e845c882 100644
--- a/plugins/tapd/tasks/story_collector.go
+++ b/plugins/tapd/tasks/story_collector.go
@@ -19,15 +19,11 @@ package tasks
 
 import (
        "fmt"
-       "net/url"
-       "time"
-
        "github.com/apache/incubator-devlake/errors"
+       "net/url"
 
        "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/apache/incubator-devlake/plugins/helper"
-       "github.com/apache/incubator-devlake/plugins/tapd/models"
 )
 
 const RAW_STORY_TABLE = "tapd_api_stories"
@@ -36,33 +32,18 @@ var _ core.SubTaskEntryPoint = CollectStorys
 
 func CollectStorys(taskCtx core.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_STORY_TABLE, false)
-       db := taskCtx.GetDal()
        logger := taskCtx.GetLogger()
        logger.Info("collect stories")
-       since := data.Since
-       incremental := false
-       if since == nil {
-               // user didn't specify a time range to sync, try load from 
database
-               var latestUpdated models.TapdStory
-               clauses := []dal.Clause{
-                       dal.Where("connection_id = ? and workspace_id = ?", 
data.Options.ConnectionId, data.Options.WorkspaceId),
-                       dal.Orderby("modified DESC"),
-               }
-               err := db.First(&latestUpdated, clauses...)
-               if err != nil && !db.IsErrorNotFound(err) {
-                       return errors.Default.Wrap(err, "failed to get latest 
tapd changelog record")
-               }
-               if latestUpdated.Id > 0 {
-                       since = (*time.Time)(latestUpdated.Modified)
-                       incremental = true
-               }
+       collectorWithState, err := 
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+       if err != nil {
+               return err
        }
-       collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
-               RawDataSubTaskArgs: *rawDataSubTaskArgs,
-               Incremental:        incremental,
-               ApiClient:          data.ApiClient,
-               PageSize:           100,
-               UrlTemplate:        "stories",
+       incremental := collectorWithState.IsIncremental()
+       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+               Incremental: incremental,
+               ApiClient:   data.ApiClient,
+               PageSize:    100,
+               UrlTemplate: "stories",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("workspace_id", fmt.Sprintf("%v", 
data.Options.WorkspaceId))
@@ -70,8 +51,15 @@ func CollectStorys(taskCtx core.SubTaskContext) errors.Error 
{
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("fields", "labels")
                        query.Set("order", "created asc")
-                       if since != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
since.In(data.Options.CstZone).Format("2006-01-02")))
+                       if data.CreatedDateAfter != nil {
+                               query.Set("created",
+                                       fmt.Sprintf(">%s",
+                                               
data.CreatedDateAfter.In(data.Options.CstZone).Format("2006-01-02")))
+                       }
+                       if incremental {
+                               query.Set("modified",
+                                       fmt.Sprintf(">%s",
+                                               
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
@@ -81,7 +69,7 @@ func CollectStorys(taskCtx core.SubTaskContext) errors.Error {
                logger.Error(err, "collect story error")
                return err
        }
-       return collector.Execute()
+       return collectorWithState.Execute()
 }
 
 var CollectStoryMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/story_commit_collector.go 
b/plugins/tapd/tasks/story_commit_collector.go
index 63194c8d3..8178832ef 100644
--- a/plugins/tapd/tasks/story_commit_collector.go
+++ b/plugins/tapd/tasks/story_commit_collector.go
@@ -37,16 +37,23 @@ var _ core.SubTaskEntryPoint = CollectStoryCommits
 func CollectStoryCommits(taskCtx core.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_STORY_COMMIT_TABLE, false)
        db := taskCtx.GetDal()
+       collectorWithState, err := 
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+       if err != nil {
+               return err
+       }
        logger := taskCtx.GetLogger()
        logger.Info("collect issueCommits")
-       since := data.Since
+       incremental := collectorWithState.IsIncremental()
        clauses := []dal.Clause{
                dal.Select("_tool_tapd_stories.id as issue_id, modified as 
update_time"),
                dal.From(&models.TapdStory{}),
                dal.Where("_tool_tapd_stories.connection_id = ? and 
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if since != nil {
-               clauses = append(clauses, dal.Where("modified > ?", since))
+       if collectorWithState.CreatedDateAfter != nil {
+               clauses = append(clauses, dal.Where("created > ?", 
*collectorWithState.CreatedDateAfter))
+       }
+       if incremental {
+               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -56,12 +63,12 @@ func CollectStoryCommits(taskCtx core.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
-               RawDataSubTaskArgs: *rawDataSubTaskArgs,
-               Incremental:        since != nil,
-               ApiClient:          data.ApiClient,
-               Input:              iterator,
-               UrlTemplate:        "code_commit_infos",
+       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+               ApiClient:   data.ApiClient,
+               PageSize:    100,
+               Incremental: incremental,
+               Input:       iterator,
+               UrlTemplate: "code_commit_infos",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        input := reqData.Input.(*models.Input)
                        query := url.Values{}
@@ -83,7 +90,7 @@ func CollectStoryCommits(taskCtx core.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect issueCommit error")
                return err
        }
-       return collector.Execute()
+       return collectorWithState.Execute()
 }
 
 var CollectStoryCommitMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/task_changelog_collector.go 
b/plugins/tapd/tasks/task_changelog_collector.go
index 88cb80ede..2fac3d27b 100644
--- a/plugins/tapd/tasks/task_changelog_collector.go
+++ b/plugins/tapd/tasks/task_changelog_collector.go
@@ -19,15 +19,11 @@ package tasks
 
 import (
        "fmt"
-       "net/url"
-       "time"
-
        "github.com/apache/incubator-devlake/errors"
+       "net/url"
 
        "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/apache/incubator-devlake/plugins/helper"
-       "github.com/apache/incubator-devlake/plugins/tapd/models"
 )
 
 const RAW_TASK_CHANGELOG_TABLE = "tapd_api_task_changelogs"
@@ -36,42 +32,34 @@ var _ core.SubTaskEntryPoint = CollectTaskChangelogs
 
 func CollectTaskChangelogs(taskCtx core.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_TASK_CHANGELOG_TABLE, false)
-       db := taskCtx.GetDal()
+       collectorWithState, err := 
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+       if err != nil {
+               return err
+       }
        logger := taskCtx.GetLogger()
        logger.Info("collect taskChangelogs")
-       since := data.Since
-       incremental := false
-       if since == nil {
-               // user didn't specify a time range to sync, try load from 
database
-               var latestUpdated models.TapdTaskChangelog
-               clauses := []dal.Clause{
-                       dal.Where("connection_id = ? and workspace_id = ?", 
data.Options.ConnectionId, data.Options.WorkspaceId),
-                       dal.Orderby("created DESC"),
-               }
-               err := db.First(&latestUpdated, clauses...)
-               if err != nil && !db.IsErrorNotFound(err) {
-                       return errors.NotFound.Wrap(err, "failed to get latest 
tapd changelog record")
-               }
-               if latestUpdated.Id > 0 {
-                       since = (*time.Time)(latestUpdated.Created)
-                       incremental = true
-               }
-       }
+       incremental := collectorWithState.IsIncremental()
 
-       collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
-               RawDataSubTaskArgs: *rawDataSubTaskArgs,
-               Incremental:        incremental,
-               ApiClient:          data.ApiClient,
-               PageSize:           100,
-               UrlTemplate:        "task_changes",
+       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+               Incremental: incremental,
+               ApiClient:   data.ApiClient,
+               PageSize:    100,
+               UrlTemplate: "task_changes",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("workspace_id", fmt.Sprintf("%v", 
data.Options.WorkspaceId))
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if since != nil {
-                               query.Set("created", fmt.Sprintf(">%s", 
since.In(data.Options.CstZone).Format("2006-01-02")))
+                       if data.CreatedDateAfter != nil {
+                               query.Set("created",
+                                       fmt.Sprintf(">%s",
+                                               
data.CreatedDateAfter.In(data.Options.CstZone).Format("2006-01-02")))
+                       }
+                       if incremental {
+                               query.Set("created",
+                                       fmt.Sprintf(">%s",
+                                               
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
@@ -81,7 +69,7 @@ func CollectTaskChangelogs(taskCtx core.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect task changelog error")
                return err
        }
-       return collector.Execute()
+       return collectorWithState.Execute()
 }
 
 var CollectTaskChangelogMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/task_collector.go 
b/plugins/tapd/tasks/task_collector.go
index 68a40fd15..7d8ef405b 100644
--- a/plugins/tapd/tasks/task_collector.go
+++ b/plugins/tapd/tasks/task_collector.go
@@ -19,15 +19,11 @@ package tasks
 
 import (
        "fmt"
-       "net/url"
-       "time"
-
        "github.com/apache/incubator-devlake/errors"
+       "net/url"
 
        "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/apache/incubator-devlake/plugins/helper"
-       "github.com/apache/incubator-devlake/plugins/tapd/models"
 )
 
 const RAW_TASK_TABLE = "tapd_api_tasks"
@@ -36,29 +32,13 @@ var _ core.SubTaskEntryPoint = CollectTasks
 
 func CollectTasks(taskCtx core.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_TASK_TABLE, false)
-       db := taskCtx.GetDal()
-
        logger := taskCtx.GetLogger()
        logger.Info("collect tasks")
-
-       since := data.Since
-       incremental := false
-       if since == nil {
-               // user didn't specify a time range to sync, try load from 
database
-               var latestUpdated models.TapdTask
-               clauses := []dal.Clause{
-                       dal.Where("connection_id = ? and workspace_id = ?", 
data.Options.ConnectionId, data.Options.WorkspaceId),
-                       dal.Orderby("modified DESC"),
-               }
-               err := db.First(&latestUpdated, clauses...)
-               if err != nil && !db.IsErrorNotFound(err) {
-                       return errors.NotFound.Wrap(err, "failed to get latest 
tapd changelog record")
-               }
-               if latestUpdated.Id > 0 {
-                       since = (*time.Time)(latestUpdated.Modified)
-                       incremental = true
-               }
+       collectorWithState, err := 
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+       if err != nil {
+               return err
        }
+       incremental := collectorWithState.IsIncremental()
 
        collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
                RawDataSubTaskArgs: *rawDataSubTaskArgs,
@@ -73,8 +53,15 @@ func CollectTasks(taskCtx core.SubTaskContext) errors.Error {
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("fields", "labels")
                        query.Set("order", "created asc")
-                       if since != nil {
-                               query.Set("modified", fmt.Sprintf(">%s", 
since.In(data.Options.CstZone).Format("2006-01-02")))
+                       if data.CreatedDateAfter != nil {
+                               query.Set("created",
+                                       fmt.Sprintf(">%s",
+                                               
data.CreatedDateAfter.In(data.Options.CstZone).Format("2006-01-02")))
+                       }
+                       if incremental {
+                               query.Set("modified",
+                                       fmt.Sprintf(">%s",
+                                               
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
diff --git a/plugins/tapd/tasks/task_commit_collector.go 
b/plugins/tapd/tasks/task_commit_collector.go
index f7a6bcb59..7410cc76c 100644
--- a/plugins/tapd/tasks/task_commit_collector.go
+++ b/plugins/tapd/tasks/task_commit_collector.go
@@ -37,16 +37,23 @@ var _ core.SubTaskEntryPoint = CollectTaskCommits
 func CollectTaskCommits(taskCtx core.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_TASK_COMMIT_TABLE, false)
        db := taskCtx.GetDal()
+       collectorWithState, err := 
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+       if err != nil {
+               return err
+       }
        logger := taskCtx.GetLogger()
        logger.Info("collect issueCommits")
-       since := data.Since
+       incremental := collectorWithState.IsIncremental()
        clauses := []dal.Clause{
                dal.Select("_tool_tapd_tasks.id as issue_id, modified as 
update_time"),
                dal.From(&models.TapdTask{}),
                dal.Where("_tool_tapd_tasks.connection_id = ? and 
_tool_tapd_tasks.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if since != nil {
-               clauses = append(clauses, dal.Where("modified > ?", since))
+       if collectorWithState.CreatedDateAfter != nil {
+               clauses = append(clauses, dal.Where("created > ?", 
*collectorWithState.CreatedDateAfter))
+       }
+       if incremental {
+               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
        }
        cursor, err := db.Cursor(clauses...)
        if err != nil {
@@ -56,10 +63,10 @@ func CollectTaskCommits(taskCtx core.SubTaskContext) 
errors.Error {
        if err != nil {
                return err
        }
-       collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
-               RawDataSubTaskArgs: *rawDataSubTaskArgs,
-               Incremental:        since != nil,
-               ApiClient:          data.ApiClient,
+       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+               ApiClient:   data.ApiClient,
+               PageSize:    100,
+               Incremental: incremental,
                //PageSize:    100,
                Input:       iterator,
                UrlTemplate: "code_commit_infos",
@@ -84,7 +91,7 @@ func CollectTaskCommits(taskCtx core.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect issueCommit error")
                return err
        }
-       return collector.Execute()
+       return collectorWithState.Execute()
 }
 
 var CollectTaskCommitMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/task_data.go b/plugins/tapd/tasks/task_data.go
index 49680deaf..b14a9d524 100644
--- a/plugins/tapd/tasks/task_data.go
+++ b/plugins/tapd/tasks/task_data.go
@@ -29,16 +29,16 @@ type TapdOptions struct {
        WorkspaceId         uint64   `mapstruct:"workspaceId"`
        CompanyId           uint64   `mapstruct:"companyId"`
        Tasks               []string `mapstruct:"tasks,omitempty"`
-       Since               string
+       CreatedDateAfter    string   `json:"createdDateAfter" 
mapstructure:"createdDateAfter,omitempty"`
        CstZone             *time.Location
        TransformationRules TransformationRules `json:"transformationRules"`
 }
 
 type TapdTaskData struct {
-       Options    *TapdOptions
-       ApiClient  *helper.ApiAsyncClient
-       Since      *time.Time
-       Connection *models.TapdConnection
+       Options          *TapdOptions
+       ApiClient        *helper.ApiAsyncClient
+       CreatedDateAfter *time.Time
+       Connection       *models.TapdConnection
 }
 
 type TypeMapping struct {
diff --git a/plugins/tapd/tasks/worklog_collector.go 
b/plugins/tapd/tasks/worklog_collector.go
index 084b01820..7ce7ca908 100644
--- a/plugins/tapd/tasks/worklog_collector.go
+++ b/plugins/tapd/tasks/worklog_collector.go
@@ -19,15 +19,11 @@ package tasks
 
 import (
        "fmt"
-       "net/url"
-       "time"
-
        "github.com/apache/incubator-devlake/errors"
+       "net/url"
 
        "github.com/apache/incubator-devlake/plugins/core"
-       "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/apache/incubator-devlake/plugins/helper"
-       "github.com/apache/incubator-devlake/plugins/tapd/models"
 )
 
 const RAW_WORKLOG_TABLE = "tapd_api_worklogs"
@@ -36,41 +32,34 @@ var _ core.SubTaskEntryPoint = CollectWorklogs
 
 func CollectWorklogs(taskCtx core.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_WORKLOG_TABLE, false)
-       db := taskCtx.GetDal()
        logger := taskCtx.GetLogger()
        logger.Info("collect worklogs")
-       since := data.Since
-       incremental := false
-       if since == nil {
-               // user didn't specify a time range to sync, try load from 
database
-               var latestUpdated models.TapdWorklog
-               clauses := []dal.Clause{
-                       dal.Where("connection_id = ? and workspace_id = ?", 
data.Options.ConnectionId, data.Options.WorkspaceId),
-                       dal.Orderby("created DESC"),
-               }
-               err := db.First(&latestUpdated, clauses...)
-               if err != nil && !db.IsErrorNotFound(err) {
-                       return errors.NotFound.Wrap(err, "failed to get latest 
tapd changelog record")
-               }
-               if latestUpdated.Id > 0 {
-                       since = (*time.Time)(latestUpdated.Created)
-                       incremental = true
-               }
+       collectorWithState, err := 
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+       if err != nil {
+               return err
        }
-       collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
-               RawDataSubTaskArgs: *rawDataSubTaskArgs,
-               Incremental:        incremental,
-               ApiClient:          data.ApiClient,
-               PageSize:           100,
-               UrlTemplate:        "timesheets",
+       incremental := collectorWithState.IsIncremental()
+
+       err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+               Incremental: incremental,
+               ApiClient:   data.ApiClient,
+               PageSize:    100,
+               UrlTemplate: "timesheets",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("workspace_id", fmt.Sprintf("%v", 
data.Options.WorkspaceId))
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if since != nil {
-                               query.Set("created", fmt.Sprintf(">%s", 
since.In(data.Options.CstZone).Format("2006-01-02")))
+                       if data.CreatedDateAfter != nil {
+                               query.Set("created",
+                                       fmt.Sprintf(">%s",
+                                               
data.CreatedDateAfter.In(data.Options.CstZone).Format("2006-01-02")))
+                       }
+                       if incremental {
+                               query.Set("created",
+                                       fmt.Sprintf(">%s",
+                                               
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        return query, nil
                },
@@ -80,7 +69,7 @@ func CollectWorklogs(taskCtx core.SubTaskContext) 
errors.Error {
                logger.Error(err, "collect worklog error")
                return err
        }
-       return collector.Execute()
+       return collectorWithState.Execute()
 }
 
 var CollectWorklogMeta = core.SubTaskMeta{

Reply via email to