This is an automated email from the ASF dual-hosted git repository.
likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new bf25f9be6 feat(tapd): add time filter (#4075)
bf25f9be6 is described below
commit bf25f9be63511b5e0713ff1c25568849cb5d8534
Author: Warren Chen <[email protected]>
AuthorDate: Fri Dec 30 15:29:38 2022 +0800
feat(tapd): add time filter (#4075)
---
plugins/github/github.go | 4 +-
plugins/github/tasks/cicd_run_collector.go | 2 +-
plugins/github/tasks/comment_collector.go | 2 +-
plugins/github/tasks/commit_collector.go | 2 +-
plugins/github/tasks/issue_collector.go | 2 +-
plugins/github/tasks/pr_commit_collector.go | 2 +-
plugins/github/tasks/pr_review_collector.go | 2 +-
.../github/tasks/pr_review_comment_collector.go | 2 +-
.../github_graphql/tasks/check_run_collector.go | 2 +-
plugins/gitlab/tasks/issue_collector.go | 2 +-
plugins/gitlab/tasks/mr_collector.go | 2 +-
plugins/gitlab/tasks/pipeline_collector.go | 2 +-
plugins/helper/api_collector_with_state.go | 4 +-
plugins/jira/tasks/issue_changelog_collector.go | 2 +-
plugins/jira/tasks/issue_collector.go | 2 +-
plugins/jira/tasks/remotelink_collector.go | 2 +-
plugins/jira/tasks/worklog_collector.go | 2 +-
plugins/tapd/impl/impl.go | 23 ++++-----
plugins/tapd/tapd.go | 8 ++--
plugins/tapd/tasks/bug_changelog_collector.go | 54 +++++++++------------
plugins/tapd/tasks/bug_collector.go | 52 ++++++++------------
plugins/tapd/tasks/bug_commit_collector.go | 28 ++++++-----
plugins/tapd/tasks/iteration_collector.go | 55 +++++++++-------------
plugins/tapd/tasks/story_bug_collector.go | 27 +++++++----
plugins/tapd/tasks/story_changelog_collector.go | 54 +++++++++------------
plugins/tapd/tasks/story_collector.go | 52 ++++++++------------
plugins/tapd/tasks/story_commit_collector.go | 27 +++++++----
plugins/tapd/tasks/task_changelog_collector.go | 54 +++++++++------------
plugins/tapd/tasks/task_collector.go | 41 ++++++----------
plugins/tapd/tasks/task_commit_collector.go | 23 +++++----
plugins/tapd/tasks/task_data.go | 10 ++--
plugins/tapd/tasks/worklog_collector.go | 53 +++++++++------------
32 files changed, 267 insertions(+), 332 deletions(-)
diff --git a/plugins/github/github.go b/plugins/github/github.go
index 6ee06b22c..9368e7ccc 100644
--- a/plugins/github/github.go
+++ b/plugins/github/github.go
@@ -32,7 +32,7 @@ func main() {
connectionId := cmd.Flags().Uint64P("connectionId", "c", 0, "github
connection id")
owner := cmd.Flags().StringP("owner", "o", "", "github owner")
repo := cmd.Flags().StringP("repo", "r", "", "github repo")
- CreatedDateAfter := cmd.Flags().StringP("createdDateAfter", "a", "",
"collect data that are created after specified time, ie 2006-05-06T07:08:09Z")
+ createdDateAfter := cmd.Flags().StringP("createdDateAfter", "a", "",
"collect data that are created after specified time, ie 2006-05-06T07:08:09Z")
_ = cmd.MarkFlagRequired("connectionId")
_ = cmd.MarkFlagRequired("owner")
_ = cmd.MarkFlagRequired("repo")
@@ -53,7 +53,7 @@ func main() {
"connectionId": *connectionId,
"owner": *owner,
"repo": *repo,
- "createdDateAfter": *CreatedDateAfter,
+ "createdDateAfter": *createdDateAfter,
"transformationRules": map[string]interface{}{
"prType": *prType,
"prComponent": *prComponent,
diff --git a/plugins/github/tasks/cicd_run_collector.go
b/plugins/github/tasks/cicd_run_collector.go
index 40440298d..ff0ea7cb2 100644
--- a/plugins/github/tasks/cicd_run_collector.go
+++ b/plugins/github/tasks/cicd_run_collector.go
@@ -51,7 +51,7 @@ func CollectRuns(taskCtx core.SubTaskContext) errors.Error {
return err
}
- //incremental := collectorWithState.CanIncrementCollect()
+ //incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 30,
diff --git a/plugins/github/tasks/comment_collector.go
b/plugins/github/tasks/comment_collector.go
index b295999a6..5d949747b 100644
--- a/plugins/github/tasks/comment_collector.go
+++ b/plugins/github/tasks/comment_collector.go
@@ -44,7 +44,7 @@ func CollectApiComments(taskCtx core.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.CanIncrementCollect()
+ incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
diff --git a/plugins/github/tasks/commit_collector.go
b/plugins/github/tasks/commit_collector.go
index 8fb8e7685..b223140fd 100644
--- a/plugins/github/tasks/commit_collector.go
+++ b/plugins/github/tasks/commit_collector.go
@@ -52,7 +52,7 @@ func CollectApiCommits(taskCtx core.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.CanIncrementCollect()
+ incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
diff --git a/plugins/github/tasks/issue_collector.go
b/plugins/github/tasks/issue_collector.go
index d668a8960..4478cb1d5 100644
--- a/plugins/github/tasks/issue_collector.go
+++ b/plugins/github/tasks/issue_collector.go
@@ -52,7 +52,7 @@ func CollectApiIssues(taskCtx core.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.CanIncrementCollect()
+ incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
diff --git a/plugins/github/tasks/pr_commit_collector.go
b/plugins/github/tasks/pr_commit_collector.go
index 5e62d431f..cb993071e 100644
--- a/plugins/github/tasks/pr_commit_collector.go
+++ b/plugins/github/tasks/pr_commit_collector.go
@@ -66,7 +66,7 @@ func CollectApiPullRequestCommits(taskCtx
core.SubTaskContext) errors.Error {
return err
}
- incremental := collectorWithState.CanIncrementCollect()
+ incremental := collectorWithState.IsIncremental()
clauses := []dal.Clause{
dal.Select("number, github_id"),
diff --git a/plugins/github/tasks/pr_review_collector.go
b/plugins/github/tasks/pr_review_collector.go
index 7d2d6f2eb..bf1e823b7 100644
--- a/plugins/github/tasks/pr_review_collector.go
+++ b/plugins/github/tasks/pr_review_collector.go
@@ -59,7 +59,7 @@ func CollectApiPullRequestReviews(taskCtx
core.SubTaskContext) errors.Error {
return err
}
- incremental := collectorWithState.CanIncrementCollect()
+ incremental := collectorWithState.IsIncremental()
clauses := []dal.Clause{
dal.Select("number, github_id"),
dal.From(models.GithubPullRequest{}.TableName()),
diff --git a/plugins/github/tasks/pr_review_comment_collector.go
b/plugins/github/tasks/pr_review_comment_collector.go
index 7d9e1a6b6..2f4829102 100644
--- a/plugins/github/tasks/pr_review_comment_collector.go
+++ b/plugins/github/tasks/pr_review_comment_collector.go
@@ -47,7 +47,7 @@ func CollectPrReviewComments(taskCtx core.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.CanIncrementCollect()
+ incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
diff --git a/plugins/github_graphql/tasks/check_run_collector.go
b/plugins/github_graphql/tasks/check_run_collector.go
index 2af9b0aac..68f5ab752 100644
--- a/plugins/github_graphql/tasks/check_run_collector.go
+++ b/plugins/github_graphql/tasks/check_run_collector.go
@@ -112,7 +112,7 @@ func CollectCheckRun(taskCtx core.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.CanIncrementCollect()
+ incremental := collectorWithState.IsIncremental()
clauses := []dal.Clause{
dal.Select("check_suite_node_id"),
diff --git a/plugins/gitlab/tasks/issue_collector.go
b/plugins/gitlab/tasks/issue_collector.go
index 5592dfe72..df8e4c01d 100644
--- a/plugins/gitlab/tasks/issue_collector.go
+++ b/plugins/gitlab/tasks/issue_collector.go
@@ -45,7 +45,7 @@ func CollectApiIssues(taskCtx core.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.CanIncrementCollect()
+ incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
diff --git a/plugins/gitlab/tasks/mr_collector.go
b/plugins/gitlab/tasks/mr_collector.go
index e4dddcf8d..6052ae132 100644
--- a/plugins/gitlab/tasks/mr_collector.go
+++ b/plugins/gitlab/tasks/mr_collector.go
@@ -42,7 +42,7 @@ func CollectApiMergeRequests(taskCtx core.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.CanIncrementCollect()
+ incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
diff --git a/plugins/gitlab/tasks/pipeline_collector.go
b/plugins/gitlab/tasks/pipeline_collector.go
index d47a27992..a9f3a0a28 100644
--- a/plugins/gitlab/tasks/pipeline_collector.go
+++ b/plugins/gitlab/tasks/pipeline_collector.go
@@ -42,7 +42,7 @@ func CollectApiPipelines(taskCtx core.SubTaskContext)
errors.Error {
return err
}
- incremental := collectorWithState.CanIncrementCollect()
+ incremental := collectorWithState.IsIncremental()
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
diff --git a/plugins/helper/api_collector_with_state.go
b/plugins/helper/api_collector_with_state.go
index 2e2064b74..c8a7a648a 100644
--- a/plugins/helper/api_collector_with_state.go
+++ b/plugins/helper/api_collector_with_state.go
@@ -63,12 +63,12 @@ func NewApiCollectorWithState(args RawDataSubTaskArgs,
createdDateAfter *time.Ti
}, nil
}
-// CanIncrementCollect return if the old data can support collect
incrementally.
+// IsIncremental return if the old data can support collect incrementally.
// only when latest collection is success &&
// (m.LatestState.CreatedDateAfter == nil means all data have been collected ||
// CreatedDateAfter at this time exists and no before than in the LatestState)
// if CreatedDateAfter at this time not exists, collect incrementally only
when "m.LatestState.CreatedDateAfter == nil"
-func (m ApiCollectorStateManager) CanIncrementCollect() bool {
+func (m ApiCollectorStateManager) IsIncremental() bool {
return m.LatestState.LatestSuccessStart != nil &&
(m.LatestState.CreatedDateAfter == nil || m.CreatedDateAfter !=
nil && !m.CreatedDateAfter.Before(*m.LatestState.CreatedDateAfter))
}
diff --git a/plugins/jira/tasks/issue_changelog_collector.go
b/plugins/jira/tasks/issue_changelog_collector.go
index 30c47cdfe..4bf46f4e9 100644
--- a/plugins/jira/tasks/issue_changelog_collector.go
+++ b/plugins/jira/tasks/issue_changelog_collector.go
@@ -73,7 +73,7 @@ func CollectIssueChangelogs(taskCtx core.SubTaskContext)
errors.Error {
dal.Where("i.updated > i.created AND bi.connection_id = ? AND
bi.board_id = ? AND i.std_type != ? ", data.Options.ConnectionId,
data.Options.BoardId, "Epic"),
dal.Groupby("i.issue_id, i.updated"),
}
- incremental := collectorWithState.CanIncrementCollect()
+ incremental := collectorWithState.IsIncremental()
if incremental {
clauses = append(clauses, dal.Having("i.updated >
max(c.issue_updated) OR (max(c.issue_updated) IS NULL AND
COUNT(c.changelog_id) > 0)"))
}
diff --git a/plugins/jira/tasks/issue_collector.go
b/plugins/jira/tasks/issue_collector.go
index 50a8504f8..91ebee2a1 100644
--- a/plugins/jira/tasks/issue_collector.go
+++ b/plugins/jira/tasks/issue_collector.go
@@ -76,7 +76,7 @@ func CollectIssues(taskCtx core.SubTaskContext) errors.Error {
jql = fmt.Sprintf("created >= '%v' %v",
createdDateAfter.Format("2006/01/02 15:04"), jql)
}
- incremental := collectorWithState.CanIncrementCollect()
+ incremental := collectorWithState.IsIncremental()
if incremental {
// user didn't specify a time range to sync, try load from
database
var latestUpdated models.JiraIssue
diff --git a/plugins/jira/tasks/remotelink_collector.go
b/plugins/jira/tasks/remotelink_collector.go
index 4867225ad..504ad509b 100644
--- a/plugins/jira/tasks/remotelink_collector.go
+++ b/plugins/jira/tasks/remotelink_collector.go
@@ -67,7 +67,7 @@ func CollectRemotelinks(taskCtx core.SubTaskContext)
errors.Error {
dal.Where("i.updated > i.created AND bi.connection_id = ? AND
bi.board_id = ? ", data.Options.ConnectionId, data.Options.BoardId),
dal.Groupby("i.issue_id, i.updated"),
}
- incremental := collectorWithState.CanIncrementCollect()
+ incremental := collectorWithState.IsIncremental()
if incremental {
if collectorWithState.LatestState.LatestSuccessStart != nil {
clauses = append(clauses, dal.Having("i.updated > ? AND
(i.updated > max(rl.issue_updated) OR max(rl.issue_updated) IS NULL)",
collectorWithState.LatestState.LatestSuccessStart))
diff --git a/plugins/jira/tasks/worklog_collector.go
b/plugins/jira/tasks/worklog_collector.go
index 3958a81a4..fc3d84392 100644
--- a/plugins/jira/tasks/worklog_collector.go
+++ b/plugins/jira/tasks/worklog_collector.go
@@ -66,7 +66,7 @@ func CollectWorklogs(taskCtx core.SubTaskContext)
errors.Error {
dal.Where("i.updated > i.created AND bi.connection_id = ? AND
bi.board_id = ? ", data.Options.ConnectionId, data.Options.BoardId),
dal.Groupby("i.issue_id, i.updated"),
}
- incremental := collectorWithState.CanIncrementCollect()
+ incremental := collectorWithState.IsIncremental()
if incremental {
clauses = append(clauses, dal.Having("i.updated >
max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND
COUNT(wl.worklog_id) > 0)"))
}
diff --git a/plugins/tapd/impl/impl.go b/plugins/tapd/impl/impl.go
index ebfdc7dba..2f977dba6 100644
--- a/plugins/tapd/impl/impl.go
+++ b/plugins/tapd/impl/impl.go
@@ -163,6 +163,7 @@ func (plugin Tapd) SubTaskMetas() []core.SubTaskMeta {
}
func (plugin Tapd) PrepareTaskData(taskCtx core.TaskContext, options
map[string]interface{}) (interface{}, errors.Error) {
+ logger := taskCtx.GetLogger()
var op tasks.TapdOptions
err := helper.Decode(options, &op, nil)
if err != nil {
@@ -180,16 +181,8 @@ func (plugin Tapd) PrepareTaskData(taskCtx
core.TaskContext, options map[string]
if err != nil {
return nil, err
}
-
- var since time.Time
- if op.Since != "" {
- since, err = errors.Convert01(time.Parse(time.RFC3339,
op.Since))
- if err != nil {
- return nil, errors.BadInput.Wrap(err, "invalid value
for `since`")
- }
- }
if connection.RateLimitPerHour == 0 {
- connection.RateLimitPerHour = 6000
+ connection.RateLimitPerHour = 3600
}
tapdApiClient, err := tasks.NewTapdApiClient(taskCtx, connection)
if err != nil {
@@ -205,8 +198,16 @@ func (plugin Tapd) PrepareTaskData(taskCtx
core.TaskContext, options map[string]
ApiClient: tapdApiClient,
Connection: connection,
}
- if !since.IsZero() {
- taskData.Since = &since
+ var createdDateAfter time.Time
+ if op.CreatedDateAfter != "" {
+ createdDateAfter, err =
errors.Convert01(time.Parse(time.RFC3339, op.CreatedDateAfter))
+ if err != nil {
+ return nil, errors.BadInput.Wrap(err, "invalid value
for `createdDateAfter`")
+ }
+ }
+ if !createdDateAfter.IsZero() {
+ taskData.CreatedDateAfter = &createdDateAfter
+ logger.Debug("collect data updated createdDateAfter %s",
createdDateAfter)
}
return taskData, nil
}
diff --git a/plugins/tapd/tapd.go b/plugins/tapd/tapd.go
index b741ffd25..6c95a5eed 100644
--- a/plugins/tapd/tapd.go
+++ b/plugins/tapd/tapd.go
@@ -31,6 +31,7 @@ func main() {
connectionId := cmd.Flags().Uint64P("connection", "c", 0, "tapd
connection id")
workspaceId := cmd.Flags().Uint64P("workspace", "w", 0, "tapd workspace
id")
companyId := cmd.Flags().Uint64P("company", "o", 0, "tapd company id")
+ createdDateAfter := cmd.Flags().StringP("createdDateAfter", "a", "",
"collect data that are created after specified time, ie 2006-05-06T07:08:09Z")
err := cmd.MarkFlagRequired("connection")
if err != nil {
panic(err)
@@ -42,9 +43,10 @@ func main() {
cmd.Run = func(c *cobra.Command, args []string) {
runner.DirectRun(c, args, PluginEntry, map[string]interface{}{
- "connectionId": *connectionId,
- "workspaceId": *workspaceId,
- "companyId": *companyId,
+ "connectionId": *connectionId,
+ "workspaceId": *workspaceId,
+ "companyId": *companyId,
+ "createdDateAfter": *createdDateAfter,
})
//
// cfg := config.GetConfig()
diff --git a/plugins/tapd/tasks/bug_changelog_collector.go
b/plugins/tapd/tasks/bug_changelog_collector.go
index 6b395c297..20c242c66 100644
--- a/plugins/tapd/tasks/bug_changelog_collector.go
+++ b/plugins/tapd/tasks/bug_changelog_collector.go
@@ -19,15 +19,11 @@ package tasks
import (
"fmt"
- "net/url"
- "time"
-
"github.com/apache/incubator-devlake/errors"
+ "net/url"
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
- "github.com/apache/incubator-devlake/plugins/tapd/models"
)
const RAW_BUG_CHANGELOG_TABLE = "tapd_api_bug_changelogs"
@@ -36,42 +32,34 @@ var _ core.SubTaskEntryPoint = CollectBugChangelogs
func CollectBugChangelogs(taskCtx core.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_BUG_CHANGELOG_TABLE, false)
- db := taskCtx.GetDal()
logger := taskCtx.GetLogger()
logger.Info("collect storyChangelogs")
- since := data.Since
- incremental := false
- if since == nil {
- // user didn't specify a time range to sync, try load from
database
- var latestUpdated models.TapdBugChangelog
- clauses := []dal.Clause{
- dal.Where("connection_id = ? and workspace_id = ?",
data.Options.ConnectionId, data.Options.WorkspaceId),
- dal.Orderby("created DESC"),
- }
- err := db.First(&latestUpdated, clauses...)
- if err != nil && !db.IsErrorNotFound(err) {
- return errors.NotFound.Wrap(err, "failed to get latest
tapd changelog record")
- }
- if latestUpdated.Id > 0 {
- since = (*time.Time)(latestUpdated.Created)
- incremental = true
- }
+ collectorWithState, err :=
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+ if err != nil {
+ return err
}
+ incremental := collectorWithState.IsIncremental()
- collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
- RawDataSubTaskArgs: *rawDataSubTaskArgs,
- Incremental: incremental,
- ApiClient: data.ApiClient,
- PageSize: 100,
- UrlTemplate: "bug_changes",
+ err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
+ UrlTemplate: "bug_changes",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("workspace_id", fmt.Sprintf("%v",
data.Options.WorkspaceId))
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
- query.Set("order", "created asc")
- if since != nil {
- query.Set("created", fmt.Sprintf(">%s",
since.In(data.Options.CstZone).Format("2006-01-02")))
+ query.Set("order", "created%20desc")
+ if data.CreatedDateAfter != nil {
+ query.Set("created",
+ fmt.Sprintf(">%s",
+
data.CreatedDateAfter.In(data.Options.CstZone).Format("2006-01-02")))
+ }
+ if incremental {
+ query.Set("created",
+ fmt.Sprintf(">%s",
+
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
@@ -81,7 +69,7 @@ func CollectBugChangelogs(taskCtx core.SubTaskContext)
errors.Error {
logger.Error(err, "collect story changelog error")
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
var CollectBugChangelogMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/bug_collector.go
b/plugins/tapd/tasks/bug_collector.go
index 570d2824c..796878b82 100644
--- a/plugins/tapd/tasks/bug_collector.go
+++ b/plugins/tapd/tasks/bug_collector.go
@@ -19,15 +19,11 @@ package tasks
import (
"fmt"
- "net/url"
- "time"
-
"github.com/apache/incubator-devlake/errors"
+ "net/url"
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
- "github.com/apache/incubator-devlake/plugins/tapd/models"
)
const RAW_BUG_TABLE = "tapd_api_bugs"
@@ -36,34 +32,19 @@ var _ core.SubTaskEntryPoint = CollectBugs
func CollectBugs(taskCtx core.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_BUG_TABLE, false)
- db := taskCtx.GetDal()
logger := taskCtx.GetLogger()
logger.Info("collect bugs")
- since := data.Since
- incremental := false
- if since == nil {
- // user didn't specify a time range to sync, try load from
database
- var latestUpdated models.TapdBug
- clauses := []dal.Clause{
- dal.Where("connection_id = ? and workspace_id = ?",
data.Options.ConnectionId, data.Options.WorkspaceId),
- dal.Orderby("modified DESC"),
- }
- err := db.First(&latestUpdated, clauses...)
- if err != nil && !db.IsErrorNotFound(err) {
- return errors.Default.Wrap(err, "failed to get latest
tapd changelog record")
- }
- if latestUpdated.Id > 0 {
- since = (*time.Time)(latestUpdated.Modified)
- incremental = true
- }
+ collectorWithState, err :=
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+ if err != nil {
+ return err
}
+ incremental := collectorWithState.IsIncremental()
- collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
- RawDataSubTaskArgs: *rawDataSubTaskArgs,
- Incremental: incremental,
- ApiClient: data.ApiClient,
- PageSize: 100,
- UrlTemplate: "bugs",
+ err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
+ UrlTemplate: "bugs",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("workspace_id", fmt.Sprintf("%v",
data.Options.WorkspaceId))
@@ -71,8 +52,15 @@ func CollectBugs(taskCtx core.SubTaskContext) errors.Error {
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if since != nil {
- query.Set("modified", fmt.Sprintf(">%s",
fmt.Sprintf(">%s", since.In(data.Options.CstZone).Format("2006-01-02"))))
+ if data.CreatedDateAfter != nil {
+ query.Set("created",
+ fmt.Sprintf(">%s",
+
data.CreatedDateAfter.In(data.Options.CstZone).Format("2006-01-02")))
+ }
+ if incremental {
+ query.Set("modified",
+ fmt.Sprintf(">%s",
+
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
@@ -82,7 +70,7 @@ func CollectBugs(taskCtx core.SubTaskContext) errors.Error {
logger.Error(err, "collect bug error")
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
var CollectBugMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/bug_commit_collector.go
b/plugins/tapd/tasks/bug_commit_collector.go
index b8a2b5fdc..928ec36f3 100644
--- a/plugins/tapd/tasks/bug_commit_collector.go
+++ b/plugins/tapd/tasks/bug_commit_collector.go
@@ -37,16 +37,23 @@ var _ core.SubTaskEntryPoint = CollectBugCommits
func CollectBugCommits(taskCtx core.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_BUG_COMMIT_TABLE, false)
db := taskCtx.GetDal()
+ collectorWithState, err :=
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+ if err != nil {
+ return err
+ }
logger := taskCtx.GetLogger()
logger.Info("collect issueCommits")
- since := data.Since
+ incremental := collectorWithState.IsIncremental()
clauses := []dal.Clause{
dal.Select("_tool_tapd_bugs.id as issue_id, modified as
update_time"),
dal.From(&models.TapdBug{}),
dal.Where("_tool_tapd_bugs.connection_id = ? and
_tool_tapd_bugs.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if since != nil {
- clauses = append(clauses, dal.Where("modified > ?", since))
+ if collectorWithState.CreatedDateAfter != nil {
+ clauses = append(clauses, dal.Where("created > ?",
*collectorWithState.CreatedDateAfter))
+ }
+ if incremental {
+ clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.LatestState.LatestSuccessStart))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -57,13 +64,12 @@ func CollectBugCommits(taskCtx core.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
- RawDataSubTaskArgs: *rawDataSubTaskArgs,
- Incremental: since != nil,
- ApiClient: data.ApiClient,
- PageSize: 100,
- Input: iterator,
- UrlTemplate: "code_commit_infos",
+ err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ ApiClient: data.ApiClient,
+ PageSize: 100,
+ Incremental: incremental,
+ Input: iterator,
+ UrlTemplate: "code_commit_infos",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
input := reqData.Input.(*models.Input)
query := url.Values{}
@@ -85,7 +91,7 @@ func CollectBugCommits(taskCtx core.SubTaskContext)
errors.Error {
logger.Error(err, "collect issueCommit error")
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
var CollectBugCommitMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/iteration_collector.go
b/plugins/tapd/tasks/iteration_collector.go
index a3b379be3..beae5fe95 100644
--- a/plugins/tapd/tasks/iteration_collector.go
+++ b/plugins/tapd/tasks/iteration_collector.go
@@ -20,16 +20,12 @@ package tasks
import (
"encoding/json"
"fmt"
+ "github.com/apache/incubator-devlake/errors"
"net/http"
"net/url"
- "time"
-
- "github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
- "github.com/apache/incubator-devlake/plugins/tapd/models"
)
const RAW_ITERATION_TABLE = "tapd_api_iterations"
@@ -38,42 +34,35 @@ var _ core.SubTaskEntryPoint = CollectIterations
func CollectIterations(taskCtx core.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_ITERATION_TABLE, false)
- db := taskCtx.GetDal()
logger := taskCtx.GetLogger()
logger.Info("collect iterations")
- since := data.Since
- incremental := false
- if since == nil {
- // user didn't specify a time range to sync, try load from
database
- var latestUpdated models.TapdIteration
- clauses := []dal.Clause{
- dal.Where("connection_id = ? and workspace_id = ?",
data.Options.ConnectionId, data.Options.WorkspaceId),
- dal.Orderby("modified DESC"),
- }
- err := db.First(&latestUpdated, clauses...)
- if err != nil && !db.IsErrorNotFound(err) {
- return errors.NotFound.Wrap(err, "failed to get latest
tapd changelog record")
- }
- if latestUpdated.Id > 0 {
- since = (*time.Time)(latestUpdated.Modified)
- incremental = true
- }
+ collectorWithState, err :=
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+ if err != nil {
+ return err
}
- collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
- RawDataSubTaskArgs: *rawDataSubTaskArgs,
- Incremental: incremental,
- ApiClient: data.ApiClient,
- PageSize: 100,
- Concurrency: 3,
- UrlTemplate: "iterations",
+ incremental := collectorWithState.IsIncremental()
+
+ err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
+ Concurrency: 3,
+ UrlTemplate: "iterations",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("workspace_id", fmt.Sprintf("%v",
data.Options.WorkspaceId))
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if since != nil {
- query.Set("modified", fmt.Sprintf(">%s",
since.In(data.Options.CstZone).Format("2006-01-02")))
+ if data.CreatedDateAfter != nil {
+ query.Set("created",
+ fmt.Sprintf(">%s",
+
data.CreatedDateAfter.In(data.Options.CstZone).Format("2006-01-02")))
+ }
+ if incremental {
+ query.Set("modified",
+ fmt.Sprintf(">%s",
+
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
@@ -89,7 +78,7 @@ func CollectIterations(taskCtx core.SubTaskContext)
errors.Error {
logger.Error(err, "collect iteration error")
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
var CollectIterationMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/story_bug_collector.go
b/plugins/tapd/tasks/story_bug_collector.go
index 02181fd28..e5b266633 100644
--- a/plugins/tapd/tasks/story_bug_collector.go
+++ b/plugins/tapd/tasks/story_bug_collector.go
@@ -35,16 +35,23 @@ var _ core.SubTaskEntryPoint = CollectStoryBugs
func CollectStoryBugs(taskCtx core.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_STORY_BUG_TABLE, false)
db := taskCtx.GetDal()
+ collectorWithState, err :=
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+ if err != nil {
+ return err
+ }
logger := taskCtx.GetLogger()
logger.Info("collect storyBugs")
- since := data.Since
+ incremental := collectorWithState.IsIncremental()
clauses := []dal.Clause{
dal.Select("id as issue_id, modified as update_time"),
dal.From(&models.TapdStory{}),
dal.Where("_tool_tapd_stories.connection_id = ? and
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if since != nil {
- clauses = append(clauses, dal.Where("modified > ?", since))
+ if collectorWithState.CreatedDateAfter != nil {
+ clauses = append(clauses, dal.Where("created > ?",
*collectorWithState.CreatedDateAfter))
+ }
+ if incremental {
+ clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.LatestState.LatestSuccessStart))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -54,12 +61,12 @@ func CollectStoryBugs(taskCtx core.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
- RawDataSubTaskArgs: *rawDataSubTaskArgs,
- ApiClient: data.ApiClient,
- Incremental: since != nil,
- Input: iterator,
- UrlTemplate: "stories/get_related_bugs",
+ err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ ApiClient: data.ApiClient,
+ PageSize: 100,
+ Incremental: incremental,
+ Input: iterator,
+ UrlTemplate: "stories/get_related_bugs",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
input := reqData.Input.(*models.Input)
query := url.Values{}
@@ -73,7 +80,7 @@ func CollectStoryBugs(taskCtx core.SubTaskContext)
errors.Error {
logger.Error(err, "collect storyBug error")
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
var CollectStoryBugMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/story_changelog_collector.go
b/plugins/tapd/tasks/story_changelog_collector.go
index 9de0e97bb..e88e2e49d 100644
--- a/plugins/tapd/tasks/story_changelog_collector.go
+++ b/plugins/tapd/tasks/story_changelog_collector.go
@@ -19,14 +19,10 @@ package tasks
import (
"fmt"
- "net/url"
- "time"
-
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
- "github.com/apache/incubator-devlake/plugins/tapd/models"
+ "net/url"
)
const RAW_STORY_CHANGELOG_TABLE = "tapd_api_story_changelogs"
@@ -35,42 +31,34 @@ var _ core.SubTaskEntryPoint = CollectStoryChangelogs
func CollectStoryChangelogs(taskCtx core.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_STORY_CHANGELOG_TABLE, false)
- db := taskCtx.GetDal()
+ collectorWithState, err :=
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+ if err != nil {
+ return err
+ }
logger := taskCtx.GetLogger()
logger.Info("collect storyChangelogs")
- since := data.Since
- incremental := false
- if since == nil {
- // user didn't specify a time range to sync, try load from
database
- var latestUpdated models.TapdStoryChangelog
- clauses := []dal.Clause{
- dal.Where("connection_id = ? and workspace_id = ?",
data.Options.ConnectionId, data.Options.WorkspaceId),
- dal.Orderby("created DESC"),
- }
- err := db.First(&latestUpdated, clauses...)
- if err != nil && !db.IsErrorNotFound(err) {
- return errors.NotFound.Wrap(err, "failed to get latest
tapd changelog record")
- }
- if latestUpdated.Id > 0 {
- since = (*time.Time)(latestUpdated.Created)
- incremental = true
- }
- }
+ incremental := collectorWithState.IsIncremental()
- collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
- RawDataSubTaskArgs: *rawDataSubTaskArgs,
- Incremental: incremental,
- ApiClient: data.ApiClient,
- PageSize: 100,
- UrlTemplate: "story_changes",
+ err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
+ UrlTemplate: "story_changes",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("workspace_id", fmt.Sprintf("%v",
data.Options.WorkspaceId))
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if since != nil {
- query.Set("created", fmt.Sprintf(">%s",
since.In(data.Options.CstZone).Format("2006-01-02")))
+ if data.CreatedDateAfter != nil {
+ query.Set("created",
+ fmt.Sprintf(">%s",
+
data.CreatedDateAfter.In(data.Options.CstZone).Format("2006-01-02")))
+ }
+ if incremental {
+ query.Set("created",
+ fmt.Sprintf(">%s",
+
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
@@ -80,7 +68,7 @@ func CollectStoryChangelogs(taskCtx core.SubTaskContext)
errors.Error {
logger.Error(err, "collect story changelog error")
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
var CollectStoryChangelogMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/story_collector.go
b/plugins/tapd/tasks/story_collector.go
index 887f4671a..7e845c882 100644
--- a/plugins/tapd/tasks/story_collector.go
+++ b/plugins/tapd/tasks/story_collector.go
@@ -19,15 +19,11 @@ package tasks
import (
"fmt"
- "net/url"
- "time"
-
"github.com/apache/incubator-devlake/errors"
+ "net/url"
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
- "github.com/apache/incubator-devlake/plugins/tapd/models"
)
const RAW_STORY_TABLE = "tapd_api_stories"
@@ -36,33 +32,18 @@ var _ core.SubTaskEntryPoint = CollectStorys
func CollectStorys(taskCtx core.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_STORY_TABLE, false)
- db := taskCtx.GetDal()
logger := taskCtx.GetLogger()
logger.Info("collect stories")
- since := data.Since
- incremental := false
- if since == nil {
- // user didn't specify a time range to sync, try load from
database
- var latestUpdated models.TapdStory
- clauses := []dal.Clause{
- dal.Where("connection_id = ? and workspace_id = ?",
data.Options.ConnectionId, data.Options.WorkspaceId),
- dal.Orderby("modified DESC"),
- }
- err := db.First(&latestUpdated, clauses...)
- if err != nil && !db.IsErrorNotFound(err) {
- return errors.Default.Wrap(err, "failed to get latest
tapd changelog record")
- }
- if latestUpdated.Id > 0 {
- since = (*time.Time)(latestUpdated.Modified)
- incremental = true
- }
+ collectorWithState, err :=
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+ if err != nil {
+ return err
}
- collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
- RawDataSubTaskArgs: *rawDataSubTaskArgs,
- Incremental: incremental,
- ApiClient: data.ApiClient,
- PageSize: 100,
- UrlTemplate: "stories",
+ incremental := collectorWithState.IsIncremental()
+ err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
+ UrlTemplate: "stories",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("workspace_id", fmt.Sprintf("%v",
data.Options.WorkspaceId))
@@ -70,8 +51,15 @@ func CollectStorys(taskCtx core.SubTaskContext) errors.Error
{
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if since != nil {
- query.Set("modified", fmt.Sprintf(">%s",
since.In(data.Options.CstZone).Format("2006-01-02")))
+ if data.CreatedDateAfter != nil {
+ query.Set("created",
+ fmt.Sprintf(">%s",
+
data.CreatedDateAfter.In(data.Options.CstZone).Format("2006-01-02")))
+ }
+ if incremental {
+ query.Set("modified",
+ fmt.Sprintf(">%s",
+
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
@@ -81,7 +69,7 @@ func CollectStorys(taskCtx core.SubTaskContext) errors.Error {
logger.Error(err, "collect story error")
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
var CollectStoryMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/story_commit_collector.go
b/plugins/tapd/tasks/story_commit_collector.go
index 63194c8d3..8178832ef 100644
--- a/plugins/tapd/tasks/story_commit_collector.go
+++ b/plugins/tapd/tasks/story_commit_collector.go
@@ -37,16 +37,23 @@ var _ core.SubTaskEntryPoint = CollectStoryCommits
func CollectStoryCommits(taskCtx core.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_STORY_COMMIT_TABLE, false)
db := taskCtx.GetDal()
+ collectorWithState, err :=
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+ if err != nil {
+ return err
+ }
logger := taskCtx.GetLogger()
logger.Info("collect issueCommits")
- since := data.Since
+ incremental := collectorWithState.IsIncremental()
clauses := []dal.Clause{
dal.Select("_tool_tapd_stories.id as issue_id, modified as
update_time"),
dal.From(&models.TapdStory{}),
dal.Where("_tool_tapd_stories.connection_id = ? and
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if since != nil {
- clauses = append(clauses, dal.Where("modified > ?", since))
+ if collectorWithState.CreatedDateAfter != nil {
+ clauses = append(clauses, dal.Where("created > ?",
*collectorWithState.CreatedDateAfter))
+ }
+ if incremental {
+ clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.LatestState.LatestSuccessStart))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -56,12 +63,12 @@ func CollectStoryCommits(taskCtx core.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
- RawDataSubTaskArgs: *rawDataSubTaskArgs,
- Incremental: since != nil,
- ApiClient: data.ApiClient,
- Input: iterator,
- UrlTemplate: "code_commit_infos",
+ err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ ApiClient: data.ApiClient,
+ PageSize: 100,
+ Incremental: incremental,
+ Input: iterator,
+ UrlTemplate: "code_commit_infos",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
input := reqData.Input.(*models.Input)
query := url.Values{}
@@ -83,7 +90,7 @@ func CollectStoryCommits(taskCtx core.SubTaskContext)
errors.Error {
logger.Error(err, "collect issueCommit error")
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
var CollectStoryCommitMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/task_changelog_collector.go
b/plugins/tapd/tasks/task_changelog_collector.go
index 88cb80ede..2fac3d27b 100644
--- a/plugins/tapd/tasks/task_changelog_collector.go
+++ b/plugins/tapd/tasks/task_changelog_collector.go
@@ -19,15 +19,11 @@ package tasks
import (
"fmt"
- "net/url"
- "time"
-
"github.com/apache/incubator-devlake/errors"
+ "net/url"
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
- "github.com/apache/incubator-devlake/plugins/tapd/models"
)
const RAW_TASK_CHANGELOG_TABLE = "tapd_api_task_changelogs"
@@ -36,42 +32,34 @@ var _ core.SubTaskEntryPoint = CollectTaskChangelogs
func CollectTaskChangelogs(taskCtx core.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_TASK_CHANGELOG_TABLE, false)
- db := taskCtx.GetDal()
+ collectorWithState, err :=
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+ if err != nil {
+ return err
+ }
logger := taskCtx.GetLogger()
logger.Info("collect taskChangelogs")
- since := data.Since
- incremental := false
- if since == nil {
- // user didn't specify a time range to sync, try load from
database
- var latestUpdated models.TapdTaskChangelog
- clauses := []dal.Clause{
- dal.Where("connection_id = ? and workspace_id = ?",
data.Options.ConnectionId, data.Options.WorkspaceId),
- dal.Orderby("created DESC"),
- }
- err := db.First(&latestUpdated, clauses...)
- if err != nil && !db.IsErrorNotFound(err) {
- return errors.NotFound.Wrap(err, "failed to get latest
tapd changelog record")
- }
- if latestUpdated.Id > 0 {
- since = (*time.Time)(latestUpdated.Created)
- incremental = true
- }
- }
+ incremental := collectorWithState.IsIncremental()
- collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
- RawDataSubTaskArgs: *rawDataSubTaskArgs,
- Incremental: incremental,
- ApiClient: data.ApiClient,
- PageSize: 100,
- UrlTemplate: "task_changes",
+ err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
+ UrlTemplate: "task_changes",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("workspace_id", fmt.Sprintf("%v",
data.Options.WorkspaceId))
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if since != nil {
- query.Set("created", fmt.Sprintf(">%s",
since.In(data.Options.CstZone).Format("2006-01-02")))
+ if data.CreatedDateAfter != nil {
+ query.Set("created",
+ fmt.Sprintf(">%s",
+
data.CreatedDateAfter.In(data.Options.CstZone).Format("2006-01-02")))
+ }
+ if incremental {
+ query.Set("created",
+ fmt.Sprintf(">%s",
+
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
@@ -81,7 +69,7 @@ func CollectTaskChangelogs(taskCtx core.SubTaskContext)
errors.Error {
logger.Error(err, "collect task changelog error")
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
var CollectTaskChangelogMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/task_collector.go
b/plugins/tapd/tasks/task_collector.go
index 68a40fd15..7d8ef405b 100644
--- a/plugins/tapd/tasks/task_collector.go
+++ b/plugins/tapd/tasks/task_collector.go
@@ -19,15 +19,11 @@ package tasks
import (
"fmt"
- "net/url"
- "time"
-
"github.com/apache/incubator-devlake/errors"
+ "net/url"
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
- "github.com/apache/incubator-devlake/plugins/tapd/models"
)
const RAW_TASK_TABLE = "tapd_api_tasks"
@@ -36,29 +32,13 @@ var _ core.SubTaskEntryPoint = CollectTasks
func CollectTasks(taskCtx core.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_TASK_TABLE, false)
- db := taskCtx.GetDal()
-
logger := taskCtx.GetLogger()
logger.Info("collect tasks")
-
- since := data.Since
- incremental := false
- if since == nil {
- // user didn't specify a time range to sync, try load from
database
- var latestUpdated models.TapdTask
- clauses := []dal.Clause{
- dal.Where("connection_id = ? and workspace_id = ?",
data.Options.ConnectionId, data.Options.WorkspaceId),
- dal.Orderby("modified DESC"),
- }
- err := db.First(&latestUpdated, clauses...)
- if err != nil && !db.IsErrorNotFound(err) {
- return errors.NotFound.Wrap(err, "failed to get latest
tapd changelog record")
- }
- if latestUpdated.Id > 0 {
- since = (*time.Time)(latestUpdated.Modified)
- incremental = true
- }
+ collectorWithState, err :=
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+ if err != nil {
+ return err
}
+ incremental := collectorWithState.IsIncremental()
collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
@@ -73,8 +53,15 @@ func CollectTasks(taskCtx core.SubTaskContext) errors.Error {
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if since != nil {
- query.Set("modified", fmt.Sprintf(">%s",
since.In(data.Options.CstZone).Format("2006-01-02")))
+ if data.CreatedDateAfter != nil {
+ query.Set("created",
+ fmt.Sprintf(">%s",
+
data.CreatedDateAfter.In(data.Options.CstZone).Format("2006-01-02")))
+ }
+ if incremental {
+ query.Set("modified",
+ fmt.Sprintf(">%s",
+
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/plugins/tapd/tasks/task_commit_collector.go
b/plugins/tapd/tasks/task_commit_collector.go
index f7a6bcb59..7410cc76c 100644
--- a/plugins/tapd/tasks/task_commit_collector.go
+++ b/plugins/tapd/tasks/task_commit_collector.go
@@ -37,16 +37,23 @@ var _ core.SubTaskEntryPoint = CollectTaskCommits
func CollectTaskCommits(taskCtx core.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_TASK_COMMIT_TABLE, false)
db := taskCtx.GetDal()
+ collectorWithState, err :=
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+ if err != nil {
+ return err
+ }
logger := taskCtx.GetLogger()
logger.Info("collect issueCommits")
- since := data.Since
+ incremental := collectorWithState.IsIncremental()
clauses := []dal.Clause{
dal.Select("_tool_tapd_tasks.id as issue_id, modified as
update_time"),
dal.From(&models.TapdTask{}),
dal.Where("_tool_tapd_tasks.connection_id = ? and
_tool_tapd_tasks.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if since != nil {
- clauses = append(clauses, dal.Where("modified > ?", since))
+ if collectorWithState.CreatedDateAfter != nil {
+ clauses = append(clauses, dal.Where("created > ?",
*collectorWithState.CreatedDateAfter))
+ }
+ if incremental {
+ clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.LatestState.LatestSuccessStart))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -56,10 +63,10 @@ func CollectTaskCommits(taskCtx core.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
- RawDataSubTaskArgs: *rawDataSubTaskArgs,
- Incremental: since != nil,
- ApiClient: data.ApiClient,
+ err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ ApiClient: data.ApiClient,
+ PageSize: 100,
+ Incremental: incremental,
//PageSize: 100,
Input: iterator,
UrlTemplate: "code_commit_infos",
@@ -84,7 +91,7 @@ func CollectTaskCommits(taskCtx core.SubTaskContext)
errors.Error {
logger.Error(err, "collect issueCommit error")
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
var CollectTaskCommitMeta = core.SubTaskMeta{
diff --git a/plugins/tapd/tasks/task_data.go b/plugins/tapd/tasks/task_data.go
index 49680deaf..b14a9d524 100644
--- a/plugins/tapd/tasks/task_data.go
+++ b/plugins/tapd/tasks/task_data.go
@@ -29,16 +29,16 @@ type TapdOptions struct {
WorkspaceId uint64 `mapstruct:"workspaceId"`
CompanyId uint64 `mapstruct:"companyId"`
Tasks []string `mapstruct:"tasks,omitempty"`
- Since string
+ CreatedDateAfter string `json:"createdDateAfter"
mapstructure:"createdDateAfter,omitempty"`
CstZone *time.Location
TransformationRules TransformationRules `json:"transformationRules"`
}
type TapdTaskData struct {
- Options *TapdOptions
- ApiClient *helper.ApiAsyncClient
- Since *time.Time
- Connection *models.TapdConnection
+ Options *TapdOptions
+ ApiClient *helper.ApiAsyncClient
+ CreatedDateAfter *time.Time
+ Connection *models.TapdConnection
}
type TypeMapping struct {
diff --git a/plugins/tapd/tasks/worklog_collector.go
b/plugins/tapd/tasks/worklog_collector.go
index 084b01820..7ce7ca908 100644
--- a/plugins/tapd/tasks/worklog_collector.go
+++ b/plugins/tapd/tasks/worklog_collector.go
@@ -19,15 +19,11 @@ package tasks
import (
"fmt"
- "net/url"
- "time"
-
"github.com/apache/incubator-devlake/errors"
+ "net/url"
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
- "github.com/apache/incubator-devlake/plugins/tapd/models"
)
const RAW_WORKLOG_TABLE = "tapd_api_worklogs"
@@ -36,41 +32,34 @@ var _ core.SubTaskEntryPoint = CollectWorklogs
func CollectWorklogs(taskCtx core.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_WORKLOG_TABLE, false)
- db := taskCtx.GetDal()
logger := taskCtx.GetLogger()
logger.Info("collect worklogs")
- since := data.Since
- incremental := false
- if since == nil {
- // user didn't specify a time range to sync, try load from
database
- var latestUpdated models.TapdWorklog
- clauses := []dal.Clause{
- dal.Where("connection_id = ? and workspace_id = ?",
data.Options.ConnectionId, data.Options.WorkspaceId),
- dal.Orderby("created DESC"),
- }
- err := db.First(&latestUpdated, clauses...)
- if err != nil && !db.IsErrorNotFound(err) {
- return errors.NotFound.Wrap(err, "failed to get latest
tapd changelog record")
- }
- if latestUpdated.Id > 0 {
- since = (*time.Time)(latestUpdated.Created)
- incremental = true
- }
+ collectorWithState, err :=
helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
+ if err != nil {
+ return err
}
- collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
- RawDataSubTaskArgs: *rawDataSubTaskArgs,
- Incremental: incremental,
- ApiClient: data.ApiClient,
- PageSize: 100,
- UrlTemplate: "timesheets",
+ incremental := collectorWithState.IsIncremental()
+
+ err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: 100,
+ UrlTemplate: "timesheets",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("workspace_id", fmt.Sprintf("%v",
data.Options.WorkspaceId))
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if since != nil {
- query.Set("created", fmt.Sprintf(">%s",
since.In(data.Options.CstZone).Format("2006-01-02")))
+ if data.CreatedDateAfter != nil {
+ query.Set("created",
+ fmt.Sprintf(">%s",
+
data.CreatedDateAfter.In(data.Options.CstZone).Format("2006-01-02")))
+ }
+ if incremental {
+ query.Set("created",
+ fmt.Sprintf(">%s",
+
collectorWithState.LatestState.LatestSuccessStart.In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
@@ -80,7 +69,7 @@ func CollectWorklogs(taskCtx core.SubTaskContext)
errors.Error {
logger.Error(err, "collect worklog error")
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
var CollectWorklogMeta = core.SubTaskMeta{