This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch feat#5841-4
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/feat#5841-4 by this push:
     new 06f3cf2b4 refactor: syncPolicy timeAfter
06f3cf2b4 is described below

commit 06f3cf2b4f7df6e5e2fa180a0bbcc28183541f8c
Author: abeizn <[email protected]>
AuthorDate: Tue Sep 12 17:45:39 2023 +0800

    refactor: syncPolicy timeAfter
---
 backend/core/models/blueprint.go                   |  4 +--
 ...dd_full_sync.go => 20230911_add_sync_policy.go} | 33 ++++++++++++---------
 backend/core/models/migrationscripts/register.go   |  2 +-
 backend/core/models/pipeline.go                    |  6 ++--
 backend/core/plugin/plugin_blueprint.go            |  1 -
 backend/core/runner/run_task.go                    | 12 ++++++--
 .../pluginhelper/api/api_collector_with_state.go   | 21 +++++++------
 backend/plugins/bamboo/api/blueprint_V200_test.go  |  3 +-
 backend/plugins/bamboo/api/blueprint_v200.go       |  5 ++--
 backend/plugins/bamboo/impl/impl.go                |  3 +-
 .../plugins/bitbucket/api/blueprint_V200_test.go   |  3 +-
 backend/plugins/bitbucket/api/blueprint_v200.go    |  8 +----
 backend/plugins/bitbucket/bitbucket.go             |  2 --
 backend/plugins/bitbucket/impl/impl.go             | 17 ++---------
 backend/plugins/bitbucket/tasks/api_common.go      | 14 +++++----
 backend/plugins/bitbucket/tasks/issue_collector.go |  2 +-
 .../bitbucket/tasks/issue_comment_collector.go     |  2 +-
 .../plugins/bitbucket/tasks/pipeline_collector.go  |  2 +-
 .../bitbucket/tasks/pipeline_steps_collector.go    |  2 +-
 backend/plugins/bitbucket/tasks/pr_collector.go    |  2 +-
 .../bitbucket/tasks/pr_comment_collector.go        |  2 +-
 .../plugins/bitbucket/tasks/pr_commit_collector.go |  5 ++--
 backend/plugins/bitbucket/tasks/task_data.go       |  4 ---
 backend/plugins/circleci/api/blueprint200.go       |  8 +----
 backend/plugins/circleci/impl/impl.go              | 14 +--------
 backend/plugins/circleci/tasks/task_data.go        |  4 ---
 backend/plugins/github/api/blueprint_V200_test.go  |  3 +-
 backend/plugins/github/api/blueprint_v200.go       |  8 +----
 backend/plugins/github/github.go                   |  2 --
 backend/plugins/github/impl/impl.go                | 13 +--------
 backend/plugins/github/tasks/cicd_job_collector.go |  2 +-
 backend/plugins/github/tasks/cicd_run_collector.go |  1 -
 backend/plugins/github/tasks/comment_collector.go  |  7 +++--
 backend/plugins/github/tasks/commit_collector.go   |  7 +++--
 backend/plugins/github/tasks/event_collector.go    |  1 -
 backend/plugins/github/tasks/issue_collector.go    |  7 +++--
 backend/plugins/github/tasks/pr_collector.go       |  1 -
 .../plugins/github/tasks/pr_commit_collector.go    |  2 +-
 .../plugins/github/tasks/pr_review_collector.go    |  2 +-
 .../github/tasks/pr_review_comment_collector.go    |  7 +++--
 backend/plugins/github/tasks/task_data.go          |  3 --
 backend/plugins/github_graphql/impl/impl.go        |  9 ------
 backend/plugins/github_graphql/plugin_main.go      |  2 --
 .../github_graphql/tasks/issue_collector.go        |  8 ++---
 .../plugins/github_graphql/tasks/job_collector.go  |  2 +-
 .../plugins/github_graphql/tasks/pr_collector.go   |  5 ++--
 backend/plugins/gitlab/api/blueprint_V200_test.go  |  3 +-
 backend/plugins/gitlab/api/blueprint_v200.go       |  9 ++----
 backend/plugins/gitlab/gitlab.go                   |  2 --
 backend/plugins/gitlab/impl/impl.go                | 16 +---------
 backend/plugins/gitlab/tasks/issue_collector.go    |  7 +++--
 backend/plugins/gitlab/tasks/job_collector.go      |  1 -
 backend/plugins/gitlab/tasks/mr_collector.go       |  7 +++--
 .../plugins/gitlab/tasks/mr_commit_collector.go    |  2 +-
 .../plugins/gitlab/tasks/mr_detail_collector.go    |  7 +++--
 backend/plugins/gitlab/tasks/mr_note_collector.go  |  2 +-
 backend/plugins/gitlab/tasks/pipeline_collector.go |  7 +++--
 .../gitlab/tasks/pipeline_detail_collector.go      |  2 +-
 backend/plugins/gitlab/tasks/task_data.go          |  4 ---
 backend/plugins/jenkins/api/blueprint_v200.go      | 10 +------
 backend/plugins/jenkins/api/blueprint_v200_test.go |  4 +--
 backend/plugins/jenkins/impl/impl.go               | 16 ++--------
 backend/plugins/jenkins/jenkins.go                 |  2 --
 backend/plugins/jenkins/tasks/build_collector.go   |  1 -
 backend/plugins/jenkins/tasks/stage_collector.go   |  6 ++--
 backend/plugins/jenkins/tasks/task_data.go         | 11 +++----
 backend/plugins/jira/api/blueprint_v200.go         |  9 +-----
 backend/plugins/jira/api/blueprint_v200_test.go    |  4 +--
 backend/plugins/jira/impl/impl.go                  | 14 ++-------
 backend/plugins/jira/jira.go                       |  3 +-
 .../jira/tasks/development_panel_collector.go      |  2 +-
 .../jira/tasks/issue_changelog_collector.go        |  2 +-
 backend/plugins/jira/tasks/issue_collector.go      | 16 +++++++---
 .../plugins/jira/tasks/issue_comment_collector.go  |  2 +-
 backend/plugins/jira/tasks/remotelink_collector.go |  2 +-
 backend/plugins/jira/tasks/task_data.go            |  7 ++---
 backend/plugins/jira/tasks/worklog_collector.go    |  2 +-
 backend/plugins/pagerduty/api/blueprint_v200.go    | 10 ++-----
 backend/plugins/pagerduty/impl/impl.go             | 18 +++---------
 .../plugins/pagerduty/tasks/incidents_collector.go |  1 -
 backend/plugins/pagerduty/tasks/task_data.go       |  8 ++---
 backend/plugins/sonarqube/api/blueprint_v200.go    |  4 +--
 .../plugins/sonarqube/api/blueprint_v200_test.go   |  4 +--
 backend/plugins/sonarqube/impl/impl.go             |  3 +-
 backend/plugins/tapd/api/blueprint_v200.go         |  8 +----
 backend/plugins/tapd/api/blueprint_v200_test.go    |  3 +-
 backend/plugins/tapd/impl/impl.go                  | 12 +-------
 .../plugins/tapd/tasks/bug_changelog_collector.go  |  7 +++--
 backend/plugins/tapd/tasks/bug_collector.go        | 11 +++----
 backend/plugins/tapd/tasks/bug_commit_collector.go | 14 +++++----
 backend/plugins/tapd/tasks/iteration_collector.go  |  7 +++--
 backend/plugins/tapd/tasks/story_bug_collector.go  | 12 ++++----
 .../tapd/tasks/story_changelog_collector.go        |  7 +++--
 backend/plugins/tapd/tasks/story_collector.go      |  7 +++--
 .../plugins/tapd/tasks/story_commit_collector.go   | 14 +++++----
 .../plugins/tapd/tasks/task_changelog_collector.go |  8 ++---
 backend/plugins/tapd/tasks/task_collector.go       |  8 ++---
 .../plugins/tapd/tasks/task_commit_collector.go    | 14 +++++----
 backend/plugins/tapd/tasks/task_data.go            |  2 --
 backend/plugins/tapd/tasks/worklog_collector.go    |  8 ++---
 backend/plugins/teambition/api/blueprint200.go     |  8 +----
 backend/plugins/teambition/impl/impl.go            | 15 ++--------
 backend/plugins/teambition/tasks/task_data.go      |  5 ++--
 backend/plugins/trello/api/blueprint_v200.go       | 10 ++-----
 backend/plugins/trello/impl/impl.go                |  3 +-
 backend/plugins/zentao/api/blueprint_V200_test.go  |  3 +-
 backend/plugins/zentao/api/blueprint_v200.go       |  9 +-----
 backend/plugins/zentao/impl/impl.go                |  3 +-
 .../plugins/zentao/tasks/bug_commits_collector.go  |  2 +-
 .../zentao/tasks/story_commits_collector.go        |  2 +-
 .../plugins/zentao/tasks/task_commits_collector.go |  2 +-
 backend/plugins/zentao/tasks/task_data.go          |  3 --
 backend/server/api/blueprints/blueprints.go        |  1 -
 backend/server/services/blueprint.go               | 34 +++++++++-------------
 backend/server/services/blueprint_makeplan_v200.go |  5 ++--
 .../services/blueprint_makeplan_v200_test.go       |  2 +-
 backend/server/services/pipeline_helper.go         |  6 ++--
 .../services/remote/plugin/plugin_extensions.go    |  1 -
 backend/test/helper/api.go                         |  6 ++--
 119 files changed, 280 insertions(+), 485 deletions(-)

diff --git a/backend/core/models/blueprint.go b/backend/core/models/blueprint.go
index 371b97210..b9ff2a457 100644
--- a/backend/core/models/blueprint.go
+++ b/backend/core/models/blueprint.go
@@ -37,13 +37,11 @@ type Blueprint struct {
        Enable       bool                   `json:"enable"`
        CronConfig   string                 `json:"cronConfig" format:"* * * * 
*" example:"0 0 * * 1"`
        IsManual     bool                   `json:"isManual"`
-       SkipOnFail   bool                   `json:"skipOnFail"`
        BeforePlan   PipelinePlan           `json:"beforePlan" 
gorm:"serializer:encdec"`
        AfterPlan    PipelinePlan           `json:"afterPlan" 
gorm:"serializer:encdec"`
-       TimeAfter    *time.Time             `json:"timeAfter"`
-       FullSync     bool                   `json:"fullSync"`
        Labels       []string               `json:"labels" gorm:"-"`
        Connections  []*BlueprintConnection `json:"connections" gorm:"-"`
+       SyncPolicy   `gorm:"embedded"`
        common.Model `swaggerignore:"true"`
 }
 
diff --git a/backend/core/models/migrationscripts/20230907_add_full_sync.go 
b/backend/core/models/migrationscripts/20230911_add_sync_policy.go
similarity index 59%
rename from backend/core/models/migrationscripts/20230907_add_full_sync.go
rename to backend/core/models/migrationscripts/20230911_add_sync_policy.go
index 293ac69a9..c9e91bf59 100644
--- a/backend/core/models/migrationscripts/20230907_add_full_sync.go
+++ b/backend/core/models/migrationscripts/20230911_add_sync_policy.go
@@ -18,41 +18,46 @@ limitations under the License.
 package migrationscripts
 
 import (
+       "time"
+
        "github.com/apache/incubator-devlake/core/context"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/helpers/migrationhelper"
 )
 
-type addFullSyncToBlueprint struct {
-       FullSync bool `json:"fullSync"`
+type addSyncPolicyToBlueprint struct {
+       FullSync       bool `json:"fullSync"`
+       SkipCollectors bool `json:"skipCollectors"`
 }
 
-func (*addFullSyncToBlueprint) TableName() string {
+func (*addSyncPolicyToBlueprint) TableName() string {
        return "_devlake_blueprints"
 }
 
-type addFullSyncToPipeline struct {
-       FullSync bool `json:"fullSync"`
+type addSyncPolicyToPipeline struct {
+       FullSync       bool       `json:"fullSync"`
+       SkipCollectors bool       `json:"skipCollectors"`
+       TimeAfter      *time.Time `json:"timeAfter"`
 }
 
-func (*addFullSyncToPipeline) TableName() string {
+func (*addSyncPolicyToPipeline) TableName() string {
        return "_devlake_pipelines"
 }
 
-type addFullSync struct{}
+type addSyncPolicy struct{}
 
-func (*addFullSync) Up(basicRes context.BasicRes) errors.Error {
+func (*addSyncPolicy) Up(basicRes context.BasicRes) errors.Error {
        return migrationhelper.AutoMigrateTables(
                basicRes,
-               &addFullSyncToBlueprint{},
-               &addFullSyncToPipeline{},
+               &addSyncPolicyToBlueprint{},
+               &addSyncPolicyToPipeline{},
        )
 }
 
-func (*addFullSync) Version() uint64 {
-       return 20230907000041
+func (*addSyncPolicy) Version() uint64 {
+       return 20230911000041
 }
 
-func (*addFullSync) Name() string {
-       return "add full_sync to _devlake_blueprints and _devlake_pipelines 
table"
+func (*addSyncPolicy) Name() string {
+       return "add sync policy to _devlake_blueprints and _devlake_pipelines 
table"
 }
diff --git a/backend/core/models/migrationscripts/register.go 
b/backend/core/models/migrationscripts/register.go
index 7c86af0d4..3874e9d91 100644
--- a/backend/core/models/migrationscripts/register.go
+++ b/backend/core/models/migrationscripts/register.go
@@ -93,6 +93,6 @@ func All() []plugin.MigrationScript {
                new(dropTapStateTable),
                new(addCICDDeploymentsTable),
                new(normalizeBpSettings),
-               new(addFullSync),
+               new(addSyncPolicy),
        }
 }
diff --git a/backend/core/models/pipeline.go b/backend/core/models/pipeline.go
index 1bbb21086..0add6c66f 100644
--- a/backend/core/models/pipeline.go
+++ b/backend/core/models/pipeline.go
@@ -69,8 +69,7 @@ type Pipeline struct {
        SpentSeconds  int          `json:"spentSeconds"`
        Stage         int          `json:"stage"`
        Labels        []string     `json:"labels" gorm:"-"`
-       SkipOnFail    bool         `json:"skipOnFail"`
-       FullSync      bool         `json:"fullSync"`
+       SyncPolicy    `gorm:"embedded"`
 }
 
 // We use a 2D array because the request body must be an array of a set of 
tasks
@@ -79,9 +78,8 @@ type NewPipeline struct {
        Name        string       `json:"name"`
        Plan        PipelinePlan `json:"plan" swaggertype:"array,string" 
example:"please check api /pipelines/<PLUGIN_NAME>/pipeline-plan"`
        Labels      []string     `json:"labels"`
-       SkipOnFail  bool         `json:"skipOnFail"`
-       FullSync    bool         `json:"fullSync"`
        BlueprintId uint64
+       SyncPolicy
 }
 
 func (Pipeline) TableName() string {
diff --git a/backend/core/plugin/plugin_blueprint.go 
b/backend/core/plugin/plugin_blueprint.go
index db3412540..492e54d43 100644
--- a/backend/core/plugin/plugin_blueprint.go
+++ b/backend/core/plugin/plugin_blueprint.go
@@ -72,7 +72,6 @@ type DataSourcePluginBlueprintV200 interface {
        MakeDataSourcePipelinePlanV200(
                connectionId uint64,
                scopes []*models.BlueprintScope,
-               syncPolicy *models.SyncPolicy,
        ) (models.PipelinePlan, []Scope, errors.Error)
 }
 
diff --git a/backend/core/runner/run_task.go b/backend/core/runner/run_task.go
index 009e4f951..96cb18cc3 100644
--- a/backend/core/runner/run_task.go
+++ b/backend/core/runner/run_task.go
@@ -54,8 +54,16 @@ func RunTask(
                return err
        }
        syncPolicy := &models.SyncPolicy{}
-       syncPolicy.SkipOnFail = dbPipeline.SkipOnFail
-       syncPolicy.FullSync = dbPipeline.FullSync
+       blueprint := &models.Blueprint{}
+       if dbPipeline.BlueprintId != 0 {
+               if err := db.First(blueprint, dal.Where("id = ? ", 
dbPipeline.BlueprintId)); err != nil {
+                       return err
+               }
+               syncPolicy = &blueprint.SyncPolicy
+       }
+       if dbPipeline.FullSync {
+               syncPolicy.FullSync = true
+       }
 
        logger, err := getTaskLogger(basicRes.GetLogger(), task)
        if err != nil {
diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go 
b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index 43f48f689..e55d1ca6b 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -37,12 +37,11 @@ type ApiCollectorStateManager struct {
        // *GraphqlCollector
        subtasks     []plugin.SubTask
        LatestState  models.CollectorLatestState
-       TimeAfter    *time.Time
        ExecuteStart time.Time
 }
 
 // NewStatefulApiCollector create a new ApiCollectorStateManager
-func NewStatefulApiCollector(args RawDataSubTaskArgs, timeAfter *time.Time) 
(*ApiCollectorStateManager, errors.Error) {
+func NewStatefulApiCollector(args RawDataSubTaskArgs) 
(*ApiCollectorStateManager, errors.Error) {
        db := args.Ctx.GetDal()
 
        rawDataSubTask, err := NewRawDataSubTask(args)
@@ -64,7 +63,6 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs, 
timeAfter *time.Time) (*Ap
        return &ApiCollectorStateManager{
                RawDataSubTaskArgs: args,
                LatestState:        latestState,
-               TimeAfter:          timeAfter,
                ExecuteStart:       time.Now(),
        }, nil
 }
@@ -73,16 +71,16 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs, 
timeAfter *time.Time) (*Ap
 func (m *ApiCollectorStateManager) IsIncremental() bool {
        prevSyncTime := m.LatestState.LatestSuccessStart
        prevTimeAfter := m.LatestState.TimeAfter
-       currTimeAfter := m.TimeAfter
        syncPolicy := m.Ctx.TaskContext().SyncPolicy()
-
        if syncPolicy != nil && syncPolicy.FullSync {
                return false
        }
+
        if prevSyncTime == nil {
                return false
        }
        // if we cleared the timeAfter, or moved timeAfter back in time, we 
should do a full sync
+       currTimeAfter := syncPolicy.TimeAfter
        if currTimeAfter != nil {
                return prevTimeAfter == nil || 
!currTimeAfter.Before(*prevTimeAfter)
        }
@@ -122,7 +120,11 @@ func (m *ApiCollectorStateManager) Execute() errors.Error {
 
        db := m.Ctx.GetDal()
        m.LatestState.LatestSuccessStart = &m.ExecuteStart
-       m.LatestState.TimeAfter = m.TimeAfter
+       syncPolicy := m.Ctx.TaskContext().SyncPolicy()
+       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+               m.LatestState.TimeAfter = syncPolicy.TimeAfter
+       }
+
        return db.CreateOrUpdate(&m.LatestState)
 }
 
@@ -154,18 +156,19 @@ func NewStatefulApiCollectorForFinalizableEntity(args 
FinalizableApiCollectorArg
                Options: args.Options,
                Params:  args.Params,
                Table:   args.Table,
-       }, args.TimeAfter)
+       })
        if err != nil {
                return nil, err
        }
 
        // // prepare the basic variables
+       syncPolicy := manager.Ctx.TaskContext().SyncPolicy()
        var isIncremental = manager.IsIncremental()
        var createdAfter *time.Time
        if isIncremental {
                createdAfter = manager.LatestState.LatestSuccessStart
-       } else {
-               createdAfter = manager.TimeAfter
+       } else if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+               createdAfter = syncPolicy.TimeAfter
        }
 
        // step 1: create a collector to collect newly added records
diff --git a/backend/plugins/bamboo/api/blueprint_V200_test.go 
b/backend/plugins/bamboo/api/blueprint_V200_test.go
index 5fb5c8a86..bd3d1abe3 100644
--- a/backend/plugins/bamboo/api/blueprint_V200_test.go
+++ b/backend/plugins/bamboo/api/blueprint_V200_test.go
@@ -48,7 +48,6 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
        const testScopeConfigName string = "bamboo scope config"
        const testProxy string = ""
 
-       syncPolicy := &coreModels.SyncPolicy{}
        bpScopes := []*coreModels.BlueprintScope{
                {
                        ScopeId: testKey,
@@ -148,7 +147,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
        })
        Init(basicRes, mockMeta)
 
-       plans, scopes, err := MakePipelinePlanV200(testSubTaskMeta, 
testConnectionID, bpScopes, syncPolicy)
+       plans, scopes, err := MakePipelinePlanV200(testSubTaskMeta, 
testConnectionID, bpScopes)
        assert.Equal(t, err, nil)
 
        assert.Equal(t, expectPlans, plans)
diff --git a/backend/plugins/bamboo/api/blueprint_v200.go 
b/backend/plugins/bamboo/api/blueprint_v200.go
index bbd2a3ae8..1b252a2eb 100644
--- a/backend/plugins/bamboo/api/blueprint_v200.go
+++ b/backend/plugins/bamboo/api/blueprint_v200.go
@@ -36,7 +36,6 @@ func MakePipelinePlanV200(
        subtaskMetas []plugin.SubTaskMeta,
        connectionId uint64,
        scope []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
        var err errors.Error
        connection := new(models.BambooConnection)
@@ -50,7 +49,7 @@ func MakePipelinePlanV200(
                return nil, nil, err
        }
 
-       pp, err := makePipelinePlanV200(subtaskMetas, scope, connection, 
syncPolicy)
+       pp, err := makePipelinePlanV200(subtaskMetas, scope, connection)
        if err != nil {
                return nil, nil, err
        }
@@ -84,7 +83,7 @@ func makeScopeV200(connectionId uint64, scopes 
[]*coreModels.BlueprintScope) ([]
 func makePipelinePlanV200(
        subtaskMetas []plugin.SubTaskMeta,
        scopes []*coreModels.BlueprintScope,
-       connection *models.BambooConnection, syncPolicy *coreModels.SyncPolicy,
+       connection *models.BambooConnection,
 ) (coreModels.PipelinePlan, errors.Error) {
        plans := make(coreModels.PipelinePlan, 0, len(scopes))
        for _, scope := range scopes {
diff --git a/backend/plugins/bamboo/impl/impl.go 
b/backend/plugins/bamboo/impl/impl.go
index 69d77b3f4..529089a1e 100644
--- a/backend/plugins/bamboo/impl/impl.go
+++ b/backend/plugins/bamboo/impl/impl.go
@@ -68,9 +68,8 @@ func (p Bamboo) ScopeConfig() dal.Tabler {
 func (p Bamboo) MakeDataSourcePipelinePlanV200(
        connectionId uint64,
        scopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
-       return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes, 
syncPolicy)
+       return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes)
 }
 
 func (p Bamboo) GetTablesInfo() []dal.Tabler {
diff --git a/backend/plugins/bitbucket/api/blueprint_V200_test.go 
b/backend/plugins/bitbucket/api/blueprint_V200_test.go
index 0a878c2e1..eb9d3534f 100644
--- a/backend/plugins/bitbucket/api/blueprint_V200_test.go
+++ b/backend/plugins/bitbucket/api/blueprint_V200_test.go
@@ -68,10 +68,9 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
        }
        bpScopes := make([]*coreModels.BlueprintScope, 0)
        bpScopes = append(bpScopes, bs)
-       syncPolicy := &coreModels.SyncPolicy{}
 
        plan := make(coreModels.PipelinePlan, len(bpScopes))
-       plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes, 
connection, syncPolicy)
+       plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes, 
connection)
        assert.Nil(t, err)
        scopes, err := makeScopesV200(bpScopes, connection)
        assert.Nil(t, err)
diff --git a/backend/plugins/bitbucket/api/blueprint_v200.go 
b/backend/plugins/bitbucket/api/blueprint_v200.go
index e438a4a68..d1b562103 100644
--- a/backend/plugins/bitbucket/api/blueprint_v200.go
+++ b/backend/plugins/bitbucket/api/blueprint_v200.go
@@ -19,7 +19,6 @@ package api
 
 import (
        "net/url"
-       "time"
 
        "github.com/apache/incubator-devlake/core/errors"
        coreModels "github.com/apache/incubator-devlake/core/models"
@@ -39,7 +38,6 @@ func MakeDataSourcePipelinePlanV200(
        subtaskMetas []plugin.SubTaskMeta,
        connectionId uint64,
        bpScopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
        // get the connection info for url
        connection := &models.BitbucketConnection{}
@@ -49,7 +47,7 @@ func MakeDataSourcePipelinePlanV200(
        }
 
        plan := make(coreModels.PipelinePlan, len(bpScopes))
-       plan, err = makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connection, syncPolicy)
+       plan, err = makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connection)
        if err != nil {
                return nil, nil, err
        }
@@ -66,7 +64,6 @@ func makeDataSourcePipelinePlanV200(
        plan coreModels.PipelinePlan,
        bpScopes []*coreModels.BlueprintScope,
        connection *models.BitbucketConnection,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, errors.Error) {
        for i, bpScope := range bpScopes {
                stage := plan[i]
@@ -101,9 +98,6 @@ func makeDataSourcePipelinePlanV200(
                        ConnectionId: repo.ConnectionId,
                        FullName:     repo.BitbucketId,
                }
-               if syncPolicy.TimeAfter != nil {
-                       op.TimeAfter = syncPolicy.TimeAfter.Format(time.RFC3339)
-               }
                options, err := tasks.EncodeTaskOptions(op)
                if err != nil {
                        return nil, err
diff --git a/backend/plugins/bitbucket/bitbucket.go 
b/backend/plugins/bitbucket/bitbucket.go
index fa36b7ad3..6c73f06cf 100644
--- a/backend/plugins/bitbucket/bitbucket.go
+++ b/backend/plugins/bitbucket/bitbucket.go
@@ -31,7 +31,6 @@ func main() {
        cmd := &cobra.Command{Use: "bitbucket"}
        connectionId := cmd.Flags().Uint64P("connectionId", "c", 0, "bitbucket 
connection id")
        fullName := cmd.Flags().StringP("fullName", "n", "", "bitbucket id: 
owner/repo")
-       timeAfter := cmd.Flags().StringP("timeAfter", "a", "", "collect data 
that are created after specified time, ie 2006-05-06T07:08:09Z")
        deploymentPattern := cmd.Flags().StringP("deployment", "", "", 
"deployment pattern")
        productionPattern := cmd.Flags().StringP("production", "", "", 
"production pattern")
        _ = cmd.MarkFlagRequired("connectionId")
@@ -41,7 +40,6 @@ func main() {
                runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
                        "connectionId": *connectionId,
                        "fullName":     *fullName,
-                       "timeAfter":    *timeAfter,
                        "scopeConfig": map[string]string{
                                "deploymentPattern": *deploymentPattern,
                                "productionPattern": *productionPattern,
diff --git a/backend/plugins/bitbucket/impl/impl.go 
b/backend/plugins/bitbucket/impl/impl.go
index 275e97377..57c243418 100644
--- a/backend/plugins/bitbucket/impl/impl.go
+++ b/backend/plugins/bitbucket/impl/impl.go
@@ -19,7 +19,6 @@ package impl
 
 import (
        "fmt"
-       "time"
 
        "github.com/apache/incubator-devlake/core/context"
        "github.com/apache/incubator-devlake/core/dal"
@@ -164,13 +163,6 @@ func (p Bitbucket) PrepareTaskData(taskCtx 
plugin.TaskContext, options map[strin
                return nil, err
        }
 
-       var timeAfter time.Time
-       if op.TimeAfter != "" {
-               timeAfter, err = errors.Convert01(time.Parse(time.RFC3339, 
op.TimeAfter))
-               if err != nil {
-                       return nil, errors.BadInput.Wrap(err, "invalid value 
for `timeAfter`")
-               }
-       }
        regexEnricher := helper.NewRegexEnricher()
        if err := regexEnricher.TryAdd(devops.DEPLOYMENT, 
op.DeploymentPattern); err != nil {
                return nil, errors.BadInput.Wrap(err, "invalid value for 
`deploymentPattern`")
@@ -183,10 +175,6 @@ func (p Bitbucket) PrepareTaskData(taskCtx 
plugin.TaskContext, options map[strin
                ApiClient:     apiClient,
                RegexEnricher: regexEnricher,
        }
-       if !timeAfter.IsZero() {
-               taskData.TimeAfter = &timeAfter
-               logger.Debug("collect data updated timeAfter %s", timeAfter)
-       }
 
        return taskData, nil
 }
@@ -201,9 +189,8 @@ func (p Bitbucket) MigrationScripts() 
[]plugin.MigrationScript {
 
 func (p Bitbucket) MakeDataSourcePipelinePlanV200(
        connectionId uint64,
-       scopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy) (pp coreModels.PipelinePlan, sc 
[]plugin.Scope, err errors.Error) {
-       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes, syncPolicy)
+       scopes []*coreModels.BlueprintScope) (pp coreModels.PipelinePlan, sc 
[]plugin.Scope, err errors.Error) {
+       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes)
 }
 
 func (p Bitbucket) ApiResources() 
map[string]map[string]plugin.ApiResourceHandler {
diff --git a/backend/plugins/bitbucket/tasks/api_common.go 
b/backend/plugins/bitbucket/tasks/api_common.go
index 5aa43a0fe..0910ff82e 100644
--- a/backend/plugins/bitbucket/tasks/api_common.go
+++ b/backend/plugins/bitbucket/tasks/api_common.go
@@ -20,15 +20,16 @@ package tasks
 import (
        "encoding/json"
        "fmt"
-       "github.com/apache/incubator-devlake/core/dal"
-       "github.com/apache/incubator-devlake/core/errors"
-       plugin "github.com/apache/incubator-devlake/core/plugin"
-       "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "io"
        "net/http"
        "net/url"
        "reflect"
        "time"
+
+       "github.com/apache/incubator-devlake/core/dal"
+       "github.com/apache/incubator-devlake/core/errors"
+       plugin "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
 )
 
 type BitbucketApiParams struct {
@@ -100,11 +101,12 @@ func GetQueryCreatedAndUpdated(fields string, 
collectorWithState *api.ApiCollect
                }
                query.Set("fields", fields)
                query.Set("sort", "created_on")
+               syncPolicy := collectorWithState.Ctx.TaskContext().SyncPolicy()
                if collectorWithState.IsIncremental() {
                        latestSuccessStart := 
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339)
                        query.Set("q", fmt.Sprintf("updated_on>=%s", 
latestSuccessStart))
-               } else if collectorWithState.TimeAfter != nil {
-                       timeAfter := 
collectorWithState.TimeAfter.Format(time.RFC3339)
+               } else if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+                       timeAfter := syncPolicy.TimeAfter.Format(time.RFC3339)
                        query.Set("q", fmt.Sprintf("updated_on>=%s", timeAfter))
                }
 
diff --git a/backend/plugins/bitbucket/tasks/issue_collector.go 
b/backend/plugins/bitbucket/tasks/issue_collector.go
index ef5f56fd5..a5f2e4a41 100644
--- a/backend/plugins/bitbucket/tasks/issue_collector.go
+++ b/backend/plugins/bitbucket/tasks/issue_collector.go
@@ -35,7 +35,7 @@ var CollectApiIssuesMeta = plugin.SubTaskMeta{
 
 func CollectApiIssues(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_ISSUE_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
diff --git a/backend/plugins/bitbucket/tasks/issue_comment_collector.go 
b/backend/plugins/bitbucket/tasks/issue_comment_collector.go
index 8f85ceace..8e0036c4a 100644
--- a/backend/plugins/bitbucket/tasks/issue_comment_collector.go
+++ b/backend/plugins/bitbucket/tasks/issue_comment_collector.go
@@ -36,7 +36,7 @@ var CollectApiIssueCommentsMeta = plugin.SubTaskMeta{
 
 func CollectApiIssueComments(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_ISSUE_COMMENTS_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
diff --git a/backend/plugins/bitbucket/tasks/pipeline_collector.go 
b/backend/plugins/bitbucket/tasks/pipeline_collector.go
index 58a5f6a4c..e25861936 100644
--- a/backend/plugins/bitbucket/tasks/pipeline_collector.go
+++ b/backend/plugins/bitbucket/tasks/pipeline_collector.go
@@ -35,7 +35,7 @@ var CollectApiPipelinesMeta = plugin.SubTaskMeta{
 
 func CollectApiPipelines(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_PIPELINE_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
diff --git a/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go 
b/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
index b03fdfefe..99cc1e39b 100644
--- a/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
+++ b/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
@@ -38,7 +38,7 @@ var CollectPipelineStepsMeta = plugin.SubTaskMeta{
 func CollectPipelineSteps(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_PIPELINE_STEPS_TABLE)
 
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
diff --git a/backend/plugins/bitbucket/tasks/pr_collector.go 
b/backend/plugins/bitbucket/tasks/pr_collector.go
index aab2c6b13..7379cc1e4 100644
--- a/backend/plugins/bitbucket/tasks/pr_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_collector.go
@@ -38,7 +38,7 @@ var CollectApiPullRequestsMeta = plugin.SubTaskMeta{
 
 func CollectApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_PULL_REQUEST_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
diff --git a/backend/plugins/bitbucket/tasks/pr_comment_collector.go 
b/backend/plugins/bitbucket/tasks/pr_comment_collector.go
index 70bd44433..c4d539f85 100644
--- a/backend/plugins/bitbucket/tasks/pr_comment_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_comment_collector.go
@@ -36,7 +36,7 @@ var CollectApiPrCommentsMeta = plugin.SubTaskMeta{
 
 func CollectApiPullRequestsComments(taskCtx plugin.SubTaskContext) 
errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_PULL_REQUEST_COMMENTS_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
diff --git a/backend/plugins/bitbucket/tasks/pr_commit_collector.go 
b/backend/plugins/bitbucket/tasks/pr_commit_collector.go
index 14c53d5d2..9d99da39c 100644
--- a/backend/plugins/bitbucket/tasks/pr_commit_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_commit_collector.go
@@ -19,10 +19,11 @@ package tasks
 
 import (
        "fmt"
+       "net/url"
+
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-       "net/url"
 )
 
 const RAW_PULL_REQUEST_COMMITS_TABLE = "bitbucket_api_pull_request_commits"
@@ -37,7 +38,7 @@ var CollectApiPrCommitsMeta = plugin.SubTaskMeta{
 
 func CollectApiPullRequestCommits(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_PULL_REQUEST_COMMITS_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
diff --git a/backend/plugins/bitbucket/tasks/task_data.go 
b/backend/plugins/bitbucket/tasks/task_data.go
index 6ecb87689..235b05080 100644
--- a/backend/plugins/bitbucket/tasks/task_data.go
+++ b/backend/plugins/bitbucket/tasks/task_data.go
@@ -18,8 +18,6 @@ limitations under the License.
 package tasks
 
 import (
-       "time"
-
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/bitbucket/models"
@@ -29,7 +27,6 @@ type BitbucketOptions struct {
        ConnectionId                 uint64   `json:"connectionId" 
mapstructure:"connectionId,omitempty"`
        Tasks                        []string `json:"tasks,omitempty" 
mapstructure:",omitempty"`
        FullName                     string   `json:"fullName" 
mapstructure:"fullName"`
-       TimeAfter                    string   `json:"timeAfter" 
mapstructure:"timeAfter,omitempty"`
        ScopeConfigId                uint64   `json:"scopeConfigId" 
mapstructure:"scopeConfigId,omitempty"`
        *models.BitbucketScopeConfig `mapstructure:"scopeConfig,omitempty" 
json:"scopeConfig"`
 }
@@ -37,7 +34,6 @@ type BitbucketOptions struct {
 type BitbucketTaskData struct {
        Options       *BitbucketOptions
        ApiClient     *api.ApiAsyncClient
-       TimeAfter     *time.Time
        RegexEnricher *api.RegexEnricher
 }
 
diff --git a/backend/plugins/circleci/api/blueprint200.go 
b/backend/plugins/circleci/api/blueprint200.go
index 9f5a8d55b..57bb1e6ed 100644
--- a/backend/plugins/circleci/api/blueprint200.go
+++ b/backend/plugins/circleci/api/blueprint200.go
@@ -19,7 +19,6 @@ package api
 
 import (
        "fmt"
-       "time"
 
        "github.com/apache/incubator-devlake/core/models/domainlayer/devops"
        "github.com/apache/incubator-devlake/plugins/circleci/models"
@@ -38,10 +37,9 @@ func MakeDataSourcePipelinePlanV200(
        subtaskMetas []plugin.SubTaskMeta,
        connectionId uint64,
        bpScopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
        plan := make(coreModels.PipelinePlan, len(bpScopes))
-       plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connectionId, syncPolicy)
+       plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connectionId)
        if err != nil {
                return nil, nil, err
        }
@@ -58,7 +56,6 @@ func makeDataSourcePipelinePlanV200(
        plan coreModels.PipelinePlan,
        bpScopes []*coreModels.BlueprintScope,
        connectionId uint64,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, errors.Error) {
        for i, bpScope := range bpScopes {
                stage := plan[i]
@@ -73,9 +70,6 @@ func makeDataSourcePipelinePlanV200(
                }
                options["projectSlug"] = project.Slug
                options["connectionId"] = connectionId
-               if syncPolicy.TimeAfter != nil {
-                       options["timeAfter"] = 
syncPolicy.TimeAfter.Format(time.RFC3339)
-               }
 
                // get scope config from db
                _, scopeConfig, err := 
scopeHelper.DbHelper().GetScopeAndConfig(connectionId, bpScope.ScopeId)
diff --git a/backend/plugins/circleci/impl/impl.go 
b/backend/plugins/circleci/impl/impl.go
index 6d7a97601..df9d9605b 100644
--- a/backend/plugins/circleci/impl/impl.go
+++ b/backend/plugins/circleci/impl/impl.go
@@ -19,7 +19,6 @@ package impl
 
 import (
        "fmt"
-       "time"
 
        "github.com/apache/incubator-devlake/core/context"
        "github.com/apache/incubator-devlake/core/dal"
@@ -90,9 +89,8 @@ func (p Circleci) SubTaskMetas() []plugin.SubTaskMeta {
 func (p Circleci) MakeDataSourcePipelinePlanV200(
        connectionId uint64,
        scopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
-       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes, syncPolicy)
+       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes)
 }
 
 func (p Circleci) PrepareTaskData(taskCtx plugin.TaskContext, options 
map[string]interface{}) (interface{}, errors.Error) {
@@ -119,16 +117,6 @@ func (p Circleci) PrepareTaskData(taskCtx 
plugin.TaskContext, options map[string
                Options:   op,
                ApiClient: apiClient,
        }
-       var createdDateAfter time.Time
-       if op.TimeAfter != "" {
-               createdDateAfter, err = 
errors.Convert01(time.Parse(time.RFC3339, op.TimeAfter))
-               if err != nil {
-                       return nil, errors.BadInput.Wrap(err, "invalid value 
for `createdDateAfter`")
-               }
-       }
-       if !createdDateAfter.IsZero() {
-               taskData.TimeAfter = &createdDateAfter
-       }
        // fallback to project scope config
        if op.ScopeConfigId == 0 {
                project := &models.CircleciProject{}
diff --git a/backend/plugins/circleci/tasks/task_data.go 
b/backend/plugins/circleci/tasks/task_data.go
index 6f5e3daf1..5c3028694 100644
--- a/backend/plugins/circleci/tasks/task_data.go
+++ b/backend/plugins/circleci/tasks/task_data.go
@@ -18,8 +18,6 @@ limitations under the License.
 package tasks
 
 import (
-       "time"
-
        "github.com/apache/incubator-devlake/core/errors"
        helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/circleci/models"
@@ -29,7 +27,6 @@ type CircleciOptions struct {
        ConnectionId  uint64                      `json:"connectionId"`
        ProjectSlug   string                      `json:"projectSlug"`
        PageSize      uint64                      `mapstruct:"pageSize"`
-       TimeAfter     string                      `json:"timeAfter" 
mapstructure:"timeAfter,omitempty"`
        ScopeConfigId uint64                      `json:"scopeConfigId" 
mapstructure:"scopeConfigId,omitempty"`
        ScopeConfig   *models.CircleciScopeConfig `json:"scopeConfig" 
mapstructure:"scopeConfig,omitempty"`
 }
@@ -37,7 +34,6 @@ type CircleciOptions struct {
 type CircleciTaskData struct {
        Options       *CircleciOptions
        ApiClient     *helper.ApiAsyncClient
-       TimeAfter     *time.Time
        RegexEnricher *helper.RegexEnricher
 }
 
diff --git a/backend/plugins/github/api/blueprint_V200_test.go 
b/backend/plugins/github/api/blueprint_V200_test.go
index 718bcb5b2..96c4c5ce0 100644
--- a/backend/plugins/github/api/blueprint_V200_test.go
+++ b/backend/plugins/github/api/blueprint_V200_test.go
@@ -69,10 +69,9 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
        }
        bpScopes := make([]*coreModels.BlueprintScope, 0)
        bpScopes = append(bpScopes, bs)
-       syncPolicy := &coreModels.SyncPolicy{}
 
        plan := make(coreModels.PipelinePlan, len(bpScopes))
-       plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes, 
connection, syncPolicy)
+       plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes, 
connection)
        assert.Nil(t, err)
        scopes, err := makeScopesV200(bpScopes, connection)
        assert.Nil(t, err)
diff --git a/backend/plugins/github/api/blueprint_v200.go 
b/backend/plugins/github/api/blueprint_v200.go
index e405e52c3..b6b508930 100644
--- a/backend/plugins/github/api/blueprint_v200.go
+++ b/backend/plugins/github/api/blueprint_v200.go
@@ -25,7 +25,6 @@ import (
        "net/http"
        "net/url"
        "strings"
-       "time"
 
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
@@ -47,7 +46,6 @@ func MakeDataSourcePipelinePlanV200(
        subtaskMetas []plugin.SubTaskMeta,
        connectionId uint64,
        bpScopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
        // get the connection info for url
        connection := &models.GithubConnection{}
@@ -64,7 +62,7 @@ func MakeDataSourcePipelinePlanV200(
        }
 
        plan := make(coreModels.PipelinePlan, len(bpScopes))
-       plan, err = makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connection, syncPolicy)
+       plan, err = makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connection)
        if err != nil {
                return nil, nil, err
        }
@@ -81,7 +79,6 @@ func makeDataSourcePipelinePlanV200(
        plan coreModels.PipelinePlan,
        bpScopes []*coreModels.BlueprintScope,
        connection *models.GithubConnection,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, errors.Error) {
        for i, bpScope := range bpScopes {
                stage := plan[i]
@@ -117,9 +114,6 @@ func makeDataSourcePipelinePlanV200(
                        GithubId:     githubRepo.GithubId,
                        Name:         githubRepo.FullName,
                }
-               if syncPolicy.TimeAfter != nil {
-                       op.TimeAfter = syncPolicy.TimeAfter.Format(time.RFC3339)
-               }
                options, err := tasks.EncodeTaskOptions(op)
                if err != nil {
                        return nil, err
diff --git a/backend/plugins/github/github.go b/backend/plugins/github/github.go
index 5dd7f8c03..de811aadf 100644
--- a/backend/plugins/github/github.go
+++ b/backend/plugins/github/github.go
@@ -32,7 +32,6 @@ func main() {
        connectionId := cmd.Flags().Uint64P("connectionId", "c", 0, "github 
connection id")
        owner := cmd.Flags().StringP("owner", "o", "", "github owner")
        repo := cmd.Flags().StringP("repo", "r", "", "github repo")
-       timeAfter := cmd.Flags().StringP("timeAfter", "a", "", "collect data 
that are created after specified time, ie 2006-05-06T07:08:09Z")
        _ = cmd.MarkFlagRequired("connectionId")
        _ = cmd.MarkFlagRequired("owner")
        _ = cmd.MarkFlagRequired("repo")
@@ -54,7 +53,6 @@ func main() {
                        "connectionId": *connectionId,
                        "owner":        *owner,
                        "repo":         *repo,
-                       "timeAfter":    *timeAfter,
                        "scopeConfig": map[string]interface{}{
                                "prType":               *prType,
                                "prComponent":          *prComponent,
diff --git a/backend/plugins/github/impl/impl.go 
b/backend/plugins/github/impl/impl.go
index 12481fe0e..2a2d0d4ed 100644
--- a/backend/plugins/github/impl/impl.go
+++ b/backend/plugins/github/impl/impl.go
@@ -19,7 +19,6 @@ package impl
 
 import (
        "fmt"
-       "time"
 
        
"github.com/apache/incubator-devlake/helpers/pluginhelper/subtaskmeta/sorter"
 
@@ -159,15 +158,6 @@ func (p Github) PrepareTaskData(taskCtx 
plugin.TaskContext, options map[string]i
                RegexEnricher: regexEnricher,
        }
 
-       if op.TimeAfter != "" {
-               var timeAfter time.Time
-               timeAfter, err = errors.Convert01(time.Parse(time.RFC3339, 
op.TimeAfter))
-               if err != nil {
-                       return nil, errors.BadInput.Wrap(err, "invalid value 
for `timeAfter`")
-               }
-               taskData.TimeAfter = &timeAfter
-               logger.Debug("collect data updated timeAfter %s", timeAfter)
-       }
        return taskData, nil
 }
 
@@ -226,9 +216,8 @@ func (p Github) ApiResources() 
map[string]map[string]plugin.ApiResourceHandler {
 func (p Github) MakeDataSourcePipelinePlanV200(
        connectionId uint64,
        scopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
-       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes, syncPolicy)
+       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes)
 }
 
 func (p Github) Close(taskCtx plugin.TaskContext) errors.Error {
diff --git a/backend/plugins/github/tasks/cicd_job_collector.go 
b/backend/plugins/github/tasks/cicd_job_collector.go
index 787476c31..73fa2ab0d 100644
--- a/backend/plugins/github/tasks/cicd_job_collector.go
+++ b/backend/plugins/github/tasks/cicd_job_collector.go
@@ -59,7 +59,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
                        Name:         data.Options.Name,
                },
                Table: RAW_JOB_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
diff --git a/backend/plugins/github/tasks/cicd_run_collector.go 
b/backend/plugins/github/tasks/cicd_run_collector.go
index 4d9158fcf..f29958575 100644
--- a/backend/plugins/github/tasks/cicd_run_collector.go
+++ b/backend/plugins/github/tasks/cicd_run_collector.go
@@ -78,7 +78,6 @@ func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error {
                        Table: RAW_RUN_TABLE,
                },
                ApiClient: data.ApiClient,
-               TimeAfter: data.TimeAfter,
                CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{
                        PageSize:    PAGE_SIZE,
                        Concurrency: 10,
diff --git a/backend/plugins/github/tasks/comment_collector.go 
b/backend/plugins/github/tasks/comment_collector.go
index 402bfa4e8..21e57f853 100644
--- a/backend/plugins/github/tasks/comment_collector.go
+++ b/backend/plugins/github/tasks/comment_collector.go
@@ -53,12 +53,13 @@ func CollectApiComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                        Name:         data.Options.Name,
                },
                Table: RAW_COMMENTS_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
 
        incremental := collectorWithState.IsIncremental()
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
@@ -68,11 +69,11 @@ func CollectApiComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("state", "all")
-                       if data.TimeAfter != nil {
+                       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
                                // Note that `since` is for filtering records 
by the `updated` time
                                // which is not ideal for semantic reasons and 
would result in slightly more records than expected.
                                // But we have no choice since it is the only 
available field we could exploit from the API.
-                               query.Set("since", data.TimeAfter.String())
+                               query.Set("since", 
syncPolicy.TimeAfter.String())
                        }
                        // if incremental == true, we overwrite it
                        if incremental {
diff --git a/backend/plugins/github/tasks/commit_collector.go 
b/backend/plugins/github/tasks/commit_collector.go
index 33e005331..c8ed10780 100644
--- a/backend/plugins/github/tasks/commit_collector.go
+++ b/backend/plugins/github/tasks/commit_collector.go
@@ -53,12 +53,13 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                        Name:         data.Options.Name,
                },
                Table: RAW_COMMIT_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
 
        incremental := collectorWithState.IsIncremental()
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
@@ -79,8 +80,8 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("state", "all")
-                       if data.TimeAfter != nil {
-                               query.Set("since", data.TimeAfter.String())
+                       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+                               query.Set("since", 
syncPolicy.TimeAfter.String())
                        }
                        if incremental {
                                query.Set("since", 
collectorWithState.LatestState.LatestSuccessStart.String())
diff --git a/backend/plugins/github/tasks/event_collector.go 
b/backend/plugins/github/tasks/event_collector.go
index 7652f068f..491cc021c 100644
--- a/backend/plugins/github/tasks/event_collector.go
+++ b/backend/plugins/github/tasks/event_collector.go
@@ -69,7 +69,6 @@ func CollectApiEvents(taskCtx plugin.SubTaskContext) 
errors.Error {
                        Table: RAW_EVENTS_TABLE,
                },
                ApiClient: data.ApiClient,
-               TimeAfter: data.TimeAfter, // set to nil to disable timeFilter
                CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{
                        PageSize:    100,
                        Concurrency: 10,
diff --git a/backend/plugins/github/tasks/issue_collector.go 
b/backend/plugins/github/tasks/issue_collector.go
index f88942319..551081c2d 100644
--- a/backend/plugins/github/tasks/issue_collector.go
+++ b/backend/plugins/github/tasks/issue_collector.go
@@ -53,12 +53,13 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                        Name:         data.Options.Name,
                },
                Table: RAW_ISSUE_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
 
        incremental := collectorWithState.IsIncremental()
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
@@ -79,8 +80,8 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
                        query.Set("state", "all")
-                       if data.TimeAfter != nil {
-                               query.Set("since", data.TimeAfter.String())
+                       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+                               query.Set("since", 
syncPolicy.TimeAfter.String())
                        }
                        if incremental {
                                query.Set("since", 
collectorWithState.LatestState.LatestSuccessStart.String())
diff --git a/backend/plugins/github/tasks/pr_collector.go 
b/backend/plugins/github/tasks/pr_collector.go
index f94e96539..a83cd3085 100644
--- a/backend/plugins/github/tasks/pr_collector.go
+++ b/backend/plugins/github/tasks/pr_collector.go
@@ -72,7 +72,6 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext) 
errors.Error {
                        Table: RAW_PULL_REQUEST_TABLE,
                },
                ApiClient: data.ApiClient,
-               TimeAfter: data.TimeAfter, // set to nil to disable timeFilter
                CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{
                        PageSize:    100,
                        Concurrency: 10,
diff --git a/backend/plugins/github/tasks/pr_commit_collector.go 
b/backend/plugins/github/tasks/pr_commit_collector.go
index 30ac310e8..0a209ff2e 100644
--- a/backend/plugins/github/tasks/pr_commit_collector.go
+++ b/backend/plugins/github/tasks/pr_commit_collector.go
@@ -68,7 +68,7 @@ func CollectApiPullRequestCommits(taskCtx 
plugin.SubTaskContext) errors.Error {
                        Name:         data.Options.Name,
                },
                Table: RAW_PR_COMMIT_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
diff --git a/backend/plugins/github/tasks/pr_review_collector.go 
b/backend/plugins/github/tasks/pr_review_collector.go
index 268f565ce..18bbde6b3 100644
--- a/backend/plugins/github/tasks/pr_review_collector.go
+++ b/backend/plugins/github/tasks/pr_review_collector.go
@@ -60,7 +60,7 @@ func CollectApiPullRequestReviews(taskCtx 
plugin.SubTaskContext) errors.Error {
                        Name:         data.Options.Name,
                },
                Table: RAW_PR_REVIEW_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
diff --git a/backend/plugins/github/tasks/pr_review_comment_collector.go 
b/backend/plugins/github/tasks/pr_review_comment_collector.go
index 48661a335..bd03b032b 100644
--- a/backend/plugins/github/tasks/pr_review_comment_collector.go
+++ b/backend/plugins/github/tasks/pr_review_comment_collector.go
@@ -56,12 +56,13 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                        Name:         data.Options.Name,
                },
                Table: RAW_PR_REVIEW_COMMENTS_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
 
        incremental := collectorWithState.IsIncremental()
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
@@ -76,11 +77,11 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                UrlTemplate: "repos/{{ .Params.Name }}/pulls/comments",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
-                       if data.TimeAfter != nil {
+                       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
                                // Note that `since` is for filtering records 
by the `updated` time
                                // which is not ideal for semantic reasons and 
would result in slightly more records than expected.
                                // But we have no choice since it is the only 
available field we could exploit from the API.
-                               query.Set("since", data.TimeAfter.String())
+                               query.Set("since", 
syncPolicy.TimeAfter.String())
                        }
                        // if incremental == true, we overwrite it
                        if incremental {
diff --git a/backend/plugins/github/tasks/task_data.go 
b/backend/plugins/github/tasks/task_data.go
index 8dcc777f8..714dbbd95 100644
--- a/backend/plugins/github/tasks/task_data.go
+++ b/backend/plugins/github/tasks/task_data.go
@@ -20,7 +20,6 @@ package tasks
 import (
        "fmt"
        "strings"
-       "time"
 
        "github.com/apache/incubator-devlake/core/errors"
        helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
@@ -31,7 +30,6 @@ type GithubOptions struct {
        ConnectionId  uint64                    `json:"connectionId" 
mapstructure:"connectionId,omitempty"`
        ScopeConfigId uint64                    `json:"scopeConfigId" 
mapstructure:"scopeConfigId,omitempty"`
        GithubId      int                       `json:"githubId" 
mapstructure:"githubId,omitempty"`
-       TimeAfter     string                    `json:"timeAfter" 
mapstructure:"timeAfter,omitempty"`
        Owner         string                    `json:"owner" 
mapstructure:"owner,omitempty"`
        Repo          string                    `json:"repo"  
mapstructure:"repo,omitempty"`
        Name          string                    `json:"name"  
mapstructure:"name,omitempty"`
@@ -42,7 +40,6 @@ type GithubTaskData struct {
        Options       *GithubOptions
        ApiClient     *helper.ApiAsyncClient
        GraphqlClient *helper.GraphqlAsyncClient
-       TimeAfter     *time.Time
        RegexEnricher *helper.RegexEnricher
 }
 
diff --git a/backend/plugins/github_graphql/impl/impl.go 
b/backend/plugins/github_graphql/impl/impl.go
index a1ef349a5..cf39093a3 100644
--- a/backend/plugins/github_graphql/impl/impl.go
+++ b/backend/plugins/github_graphql/impl/impl.go
@@ -232,15 +232,6 @@ func (p GithubGraphql) PrepareTaskData(taskCtx 
plugin.TaskContext, options map[s
                GraphqlClient: graphqlClient,
                RegexEnricher: regexEnricher,
        }
-       if op.TimeAfter != "" {
-               var timeAfter time.Time
-               timeAfter, err = errors.Convert01(time.Parse(time.RFC3339, 
op.TimeAfter))
-               if err != nil {
-                       return nil, errors.BadInput.Wrap(err, "invalid value 
for `timeAfter`")
-               }
-               taskData.TimeAfter = &timeAfter
-               logger.Debug("collect data updated timeAfter %s", timeAfter)
-       }
 
        return taskData, nil
 }
diff --git a/backend/plugins/github_graphql/plugin_main.go 
b/backend/plugins/github_graphql/plugin_main.go
index e9edf721b..3f1777979 100644
--- a/backend/plugins/github_graphql/plugin_main.go
+++ b/backend/plugins/github_graphql/plugin_main.go
@@ -32,7 +32,6 @@ func main() {
        connectionId := cmd.Flags().Uint64P("connectionId", "c", 0, "github 
connection id")
        owner := cmd.Flags().StringP("owner", "o", "", "github owner")
        repo := cmd.Flags().StringP("repo", "r", "", "github repo")
-       timeAfter := cmd.Flags().StringP("timeAfter", "a", "", "collect data 
that are updated/created after specified time, ie 2006-05-06T07:08:09Z")
        _ = cmd.MarkFlagRequired("connectionId")
        _ = cmd.MarkFlagRequired("owner")
        _ = cmd.MarkFlagRequired("repo")
@@ -42,7 +41,6 @@ func main() {
                        "connectionId": *connectionId,
                        "owner":        *owner,
                        "repo":         *repo,
-                       "timeAfter":    *timeAfter,
                })
        }
        runner.RunCmd(cmd)
diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go 
b/backend/plugins/github_graphql/tasks/issue_collector.go
index 768822494..ece8351ba 100644
--- a/backend/plugins/github_graphql/tasks/issue_collector.go
+++ b/backend/plugins/github_graphql/tasks/issue_collector.go
@@ -104,13 +104,13 @@ func CollectIssue(taskCtx plugin.SubTaskContext) 
errors.Error {
                        Name:         data.Options.Name,
                },
                Table: RAW_ISSUES_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
 
        incremental := collectorWithState.IsIncremental()
-
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        err = 
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
                GraphqlClient: data.GraphqlClient,
                PageSize:      100,
@@ -123,8 +123,8 @@ func CollectIssue(taskCtx plugin.SubTaskContext) 
errors.Error {
                        since := helper.DateTime{}
                        if incremental {
                                since = helper.DateTime{Time: 
*collectorWithState.LatestState.LatestSuccessStart}
-                       } else if collectorWithState.TimeAfter != nil {
-                               since = helper.DateTime{Time: 
*collectorWithState.TimeAfter}
+                       } else if syncPolicy != nil && syncPolicy.TimeAfter != 
nil {
+                               since = helper.DateTime{Time: 
*syncPolicy.TimeAfter}
                        }
                        ownerName := strings.Split(data.Options.Name, "/")
                        variables := map[string]interface{}{
diff --git a/backend/plugins/github_graphql/tasks/job_collector.go 
b/backend/plugins/github_graphql/tasks/job_collector.go
index 805a86c5e..095276c4a 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -111,7 +111,7 @@ func CollectGraphqlJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        Name:         data.Options.Name,
                },
                Table: RAW_GRAPHQL_JOBS_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go 
b/backend/plugins/github_graphql/tasks/pr_collector.go
index cc854c7f0..ae79c5a37 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -155,12 +155,13 @@ func CollectPr(taskCtx plugin.SubTaskContext) 
errors.Error {
                        Name:         data.Options.Name,
                },
                Table: RAW_PRS_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
 
        incremental := collectorWithState.IsIncremental()
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
 
        err = collectorWithState.InitGraphQLCollector(api.GraphqlCollectorArgs{
                GraphqlClient: data.GraphqlClient,
@@ -195,7 +196,7 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
                        isFinish := false
                        for _, rawL := range prs {
                                // collect data even though in increment mode 
because of updating existing data
-                               if collectorWithState.TimeAfter != nil && 
!collectorWithState.TimeAfter.Before(rawL.UpdatedAt) {
+                               if syncPolicy != nil && syncPolicy.TimeAfter != 
nil && !syncPolicy.TimeAfter.Before(rawL.UpdatedAt) {
                                        isFinish = true
                                        break
                                }
diff --git a/backend/plugins/gitlab/api/blueprint_V200_test.go 
b/backend/plugins/gitlab/api/blueprint_V200_test.go
index 99bc27062..aa366b73b 100644
--- a/backend/plugins/gitlab/api/blueprint_V200_test.go
+++ b/backend/plugins/gitlab/api/blueprint_V200_test.go
@@ -52,7 +52,6 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
        const testScopeConfigName string = "gitlab scope config"
        const testProxy string = ""
 
-       syncPolicy := &coreModels.SyncPolicy{}
        bpScopes := []*coreModels.BlueprintScope{
                {
                        ScopeId: strconv.Itoa(testID),
@@ -201,7 +200,7 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
        })
        Init(mockRes, mockMeta)
 
-       plans, scopes, err := MakePipelinePlanV200(testSubTaskMeta, 
testConnectionID, bpScopes, syncPolicy)
+       plans, scopes, err := MakePipelinePlanV200(testSubTaskMeta, 
testConnectionID, bpScopes)
        assert.Equal(t, err, nil)
 
        assert.Equal(t, expectPlans, plans)
diff --git a/backend/plugins/gitlab/api/blueprint_v200.go 
b/backend/plugins/gitlab/api/blueprint_v200.go
index d7197cb5d..7b6fba3cb 100644
--- a/backend/plugins/gitlab/api/blueprint_v200.go
+++ b/backend/plugins/gitlab/api/blueprint_v200.go
@@ -24,7 +24,6 @@ import (
        "net/http"
        "net/url"
        "strconv"
-       "time"
 
        "github.com/apache/incubator-devlake/plugins/gitlab/tasks"
 
@@ -46,7 +45,6 @@ func MakePipelinePlanV200(
        subtaskMetas []plugin.SubTaskMeta,
        connectionId uint64,
        scope []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
        var err errors.Error
        connection := new(models.GitlabConnection)
@@ -60,7 +58,7 @@ func MakePipelinePlanV200(
                return nil, nil, err
        }
 
-       pp, err := makePipelinePlanV200(subtaskMetas, scope, connection, 
syncPolicy)
+       pp, err := makePipelinePlanV200(subtaskMetas, scope, connection)
        if err != nil {
                return nil, nil, err
        }
@@ -114,7 +112,7 @@ func makeScopeV200(connectionId uint64, scopes 
[]*coreModels.BlueprintScope) ([]
 func makePipelinePlanV200(
        subtaskMetas []plugin.SubTaskMeta,
        scopes []*coreModels.BlueprintScope,
-       connection *models.GitlabConnection, syncPolicy *coreModels.SyncPolicy,
+       connection *models.GitlabConnection,
 ) (coreModels.PipelinePlan, errors.Error) {
        plans := make(coreModels.PipelinePlan, 0, 3*len(scopes))
        for _, scope := range scopes {
@@ -137,9 +135,6 @@ func makePipelinePlanV200(
                options["connectionId"] = connection.ID
                options["projectId"] = intScopeId
                options["scopeConfigId"] = scopeConfig.ID
-               if syncPolicy.TimeAfter != nil {
-                       options["timeAfter"] = 
syncPolicy.TimeAfter.Format(time.RFC3339)
-               }
 
                // construct subtasks
                subtasks, err := helper.MakePipelinePlanSubtasks(subtaskMetas, 
scopeConfig.Entities)
diff --git a/backend/plugins/gitlab/gitlab.go b/backend/plugins/gitlab/gitlab.go
index 5a6d2701c..86f3bac9d 100644
--- a/backend/plugins/gitlab/gitlab.go
+++ b/backend/plugins/gitlab/gitlab.go
@@ -31,7 +31,6 @@ func main() {
        cmd := &cobra.Command{Use: "gitlab"}
        projectId := cmd.Flags().IntP("project-id", "p", 0, "gitlab project id")
        connectionId := cmd.Flags().Uint64P("connection-id", "c", 0, "gitlab 
connection id")
-       timeAfter := cmd.Flags().StringP("timeAfter", "a", "", "collect data 
that are created after specified time, ie 2006-05-06T07:08:09Z")
        _ = cmd.MarkFlagRequired("project-id")
        _ = cmd.MarkFlagRequired("connection-id")
 
@@ -50,7 +49,6 @@ func main() {
                runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
                        "projectId":    *projectId,
                        "connectionId": *connectionId,
-                       "timeAfter":    *timeAfter,
                        "scopeConfig": map[string]interface{}{
                                "prType":               *prType,
                                "prComponent":          *prComponent,
diff --git a/backend/plugins/gitlab/impl/impl.go 
b/backend/plugins/gitlab/impl/impl.go
index 36c7c2ac2..1233802cd 100644
--- a/backend/plugins/gitlab/impl/impl.go
+++ b/backend/plugins/gitlab/impl/impl.go
@@ -19,7 +19,6 @@ package impl
 
 import (
        "fmt"
-       "time"
 
        
"github.com/apache/incubator-devlake/helpers/pluginhelper/subtaskmeta/sorter"
 
@@ -77,9 +76,8 @@ func (p Gitlab) ScopeConfig() dal.Tabler {
 func (p Gitlab) MakeDataSourcePipelinePlanV200(
        connectionId uint64,
        scopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
-       return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes, 
syncPolicy)
+       return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes)
 }
 
 func (p Gitlab) GetTablesInfo() []dal.Tabler {
@@ -151,14 +149,6 @@ func (p Gitlab) PrepareTaskData(taskCtx 
plugin.TaskContext, options map[string]i
                return nil, err
        }
 
-       var timeAfter time.Time
-       if op.TimeAfter != "" {
-               timeAfter, err = errors.Convert01(time.Parse(time.RFC3339, 
op.TimeAfter))
-               if err != nil {
-                       return nil, errors.BadInput.Wrap(err, "invalid value 
for `timeAfter`")
-               }
-       }
-
        if op.ProjectId != 0 {
                var scope *models.GitlabProject
                // support v100 & advance mode
@@ -212,10 +202,6 @@ func (p Gitlab) PrepareTaskData(taskCtx 
plugin.TaskContext, options map[string]i
                RegexEnricher: regexEnricher,
        }
 
-       if !timeAfter.IsZero() {
-               taskData.TimeAfter = &timeAfter
-               logger.Debug("collect data updated timeAfter %s", timeAfter)
-       }
        return &taskData, nil
 }
 
diff --git a/backend/plugins/gitlab/tasks/issue_collector.go 
b/backend/plugins/gitlab/tasks/issue_collector.go
index a8b51c5c6..425adc9f9 100644
--- a/backend/plugins/gitlab/tasks/issue_collector.go
+++ b/backend/plugins/gitlab/tasks/issue_collector.go
@@ -46,12 +46,13 @@ var CollectApiIssuesMeta = plugin.SubTaskMeta{
 
 func CollectApiIssues(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_ISSUE_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
 
        incremental := collectorWithState.IsIncremental()
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
                PageSize:    100,
@@ -63,8 +64,8 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                */
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("updated_after", 
collectorWithState.TimeAfter.Format(time.RFC3339))
+                       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+                               query.Set("updated_after", 
syncPolicy.TimeAfter.Format(time.RFC3339))
                        }
                        if incremental {
                                query.Set("updated_after", 
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
diff --git a/backend/plugins/gitlab/tasks/job_collector.go 
b/backend/plugins/gitlab/tasks/job_collector.go
index 6a7475784..755f8800b 100644
--- a/backend/plugins/gitlab/tasks/job_collector.go
+++ b/backend/plugins/gitlab/tasks/job_collector.go
@@ -55,7 +55,6 @@ func CollectApiJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
        collector, err := 
helper.NewStatefulApiCollectorForFinalizableEntity(helper.FinalizableApiCollectorArgs{
                RawDataSubTaskArgs: *rawDataSubTaskArgs,
                ApiClient:          data.ApiClient,
-               TimeAfter:          data.TimeAfter, // set to nil to disable 
timeFilter
                CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{
                        PageSize:    100,
                        Concurrency: 10,
diff --git a/backend/plugins/gitlab/tasks/mr_collector.go 
b/backend/plugins/gitlab/tasks/mr_collector.go
index 125262e60..ef991ae01 100644
--- a/backend/plugins/gitlab/tasks/mr_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_collector.go
@@ -43,12 +43,13 @@ var CollectApiMergeRequestsMeta = plugin.SubTaskMeta{
 
 func CollectApiMergeRequests(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_MERGE_REQUEST_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
 
        incremental := collectorWithState.IsIncremental()
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                ApiClient:      data.ApiClient,
                PageSize:       100,
@@ -61,8 +62,8 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext) 
errors.Error {
                        if err != nil {
                                return nil, err
                        }
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("updated_after", 
collectorWithState.TimeAfter.Format(time.RFC3339))
+                       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+                               query.Set("updated_after", 
syncPolicy.TimeAfter.Format(time.RFC3339))
                        }
                        if incremental {
                                query.Set("updated_after", 
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
diff --git a/backend/plugins/gitlab/tasks/mr_commit_collector.go 
b/backend/plugins/gitlab/tasks/mr_commit_collector.go
index f84a58f1d..2fbbdd48a 100644
--- a/backend/plugins/gitlab/tasks/mr_commit_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_commit_collector.go
@@ -40,7 +40,7 @@ var CollectApiMrCommitsMeta = plugin.SubTaskMeta{
 
 func CollectApiMergeRequestsCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_MERGE_REQUEST_COMMITS_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
diff --git a/backend/plugins/gitlab/tasks/mr_detail_collector.go 
b/backend/plugins/gitlab/tasks/mr_detail_collector.go
index 0b3971d3f..397d7f91f 100644
--- a/backend/plugins/gitlab/tasks/mr_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_detail_collector.go
@@ -44,7 +44,7 @@ var CollectApiMergeRequestDetailsMeta = plugin.SubTaskMeta{
 
 func CollectApiMergeRequestDetails(taskCtx plugin.SubTaskContext) errors.Error 
{
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_MERGE_REQUEST_DETAIL_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
@@ -77,6 +77,7 @@ func CollectApiMergeRequestDetails(taskCtx 
plugin.SubTaskContext) errors.Error {
 func GetMergeRequestDetailsIterator(taskCtx plugin.SubTaskContext, 
collectorWithState *helper.ApiCollectorStateManager) 
(*helper.DalCursorIterator, errors.Error) {
        db := taskCtx.GetDal()
        data := taskCtx.GetData().(*GitlabTaskData)
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        clauses := []dal.Clause{
                dal.Select("gmr.gitlab_id, gmr.iid"),
                dal.From("_tool_gitlab_merge_requests gmr"),
@@ -87,8 +88,8 @@ func GetMergeRequestDetailsIterator(taskCtx 
plugin.SubTaskContext, collectorWith
        }
        if collectorWithState.LatestState.LatestSuccessStart != nil {
                clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
-       } else if collectorWithState.TimeAfter != nil {
-               clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*collectorWithState.TimeAfter))
+       } else if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+               clauses = append(clauses, dal.Where("gitlab_updated_at > ?", 
*syncPolicy.TimeAfter))
        }
        // construct the input iterator
        cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/gitlab/tasks/mr_note_collector.go 
b/backend/plugins/gitlab/tasks/mr_note_collector.go
index a4d5f40ff..d6fde5751 100644
--- a/backend/plugins/gitlab/tasks/mr_note_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_note_collector.go
@@ -40,7 +40,7 @@ var CollectApiMrNotesMeta = plugin.SubTaskMeta{
 
 func CollectApiMergeRequestsNotes(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_MERGE_REQUEST_NOTES_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
diff --git a/backend/plugins/gitlab/tasks/pipeline_collector.go 
b/backend/plugins/gitlab/tasks/pipeline_collector.go
index 1ef244887..f93d21655 100644
--- a/backend/plugins/gitlab/tasks/pipeline_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_collector.go
@@ -44,7 +44,7 @@ var CollectApiPipelinesMeta = plugin.SubTaskMeta{
 
 func CollectApiPipelines(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_PIPELINE_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
@@ -54,6 +54,7 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext) 
errors.Error {
                return err
        }
        incremental := collectorWithState.IsIncremental()
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                RawDataSubTaskArgs: *rawDataSubTaskArgs,
                ApiClient:          data.ApiClient,
@@ -63,8 +64,8 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext) 
errors.Error {
                UrlTemplate:        "projects/{{ .Params.ProjectId 
}}/pipelines",
                Query: func(reqData *helper.RequestData) (url.Values, 
errors.Error) {
                        query := url.Values{}
-                       if collectorWithState.TimeAfter != nil {
-                               query.Set("updated_after", 
collectorWithState.TimeAfter.Format(time.RFC3339))
+                       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+                               query.Set("updated_after", 
syncPolicy.TimeAfter.Format(time.RFC3339))
                        }
                        if incremental {
                                query.Set("updated_after", 
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339))
diff --git a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go 
b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
index 856659adb..a37807b8c 100644
--- a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
@@ -45,7 +45,7 @@ var CollectApiPipelineDetailsMeta = plugin.SubTaskMeta{
 
 func CollectApiPipelineDetails(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_PIPELINE_DETAILS_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
diff --git a/backend/plugins/gitlab/tasks/task_data.go 
b/backend/plugins/gitlab/tasks/task_data.go
index a49d11389..45e4abe3b 100644
--- a/backend/plugins/gitlab/tasks/task_data.go
+++ b/backend/plugins/gitlab/tasks/task_data.go
@@ -18,8 +18,6 @@ limitations under the License.
 package tasks
 
 import (
-       "time"
-
        "github.com/apache/incubator-devlake/core/errors"
        helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/gitlab/models"
@@ -29,7 +27,6 @@ type GitlabOptions struct {
        ConnectionId  uint64                    `mapstructure:"connectionId" 
json:"connectionId"`
        ProjectId     int                       `mapstructure:"projectId" 
json:"projectId"`
        ScopeConfigId uint64                    `mapstructure:"scopeConfigId" 
json:"scopeConfigId"`
-       TimeAfter     string                    `mapstructure:"timeAfter" 
json:"timeAfter"`
        ScopeConfig   *models.GitlabScopeConfig `mapstructure:"scopeConfig" 
json:"scopeConfig"`
 }
 
@@ -37,7 +34,6 @@ type GitlabTaskData struct {
        Options       *GitlabOptions
        ApiClient     *helper.ApiAsyncClient
        ProjectCommit *models.GitlabProjectCommit
-       TimeAfter     *time.Time
        RegexEnricher *helper.RegexEnricher
 }
 
diff --git a/backend/plugins/jenkins/api/blueprint_v200.go 
b/backend/plugins/jenkins/api/blueprint_v200.go
index 7dca09a03..10940fb62 100644
--- a/backend/plugins/jenkins/api/blueprint_v200.go
+++ b/backend/plugins/jenkins/api/blueprint_v200.go
@@ -18,8 +18,6 @@ limitations under the License.
 package api
 
 import (
-       "time"
-
        "github.com/apache/incubator-devlake/core/errors"
        coreModels "github.com/apache/incubator-devlake/core/models"
        "github.com/apache/incubator-devlake/core/models/domainlayer"
@@ -35,10 +33,9 @@ func MakeDataSourcePipelinePlanV200(
        subtaskMetas []plugin.SubTaskMeta,
        connectionId uint64,
        bpScopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
        plan := make(coreModels.PipelinePlan, len(bpScopes))
-       plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connectionId, syncPolicy)
+       plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connectionId)
        if err != nil {
                return nil, nil, err
        }
@@ -55,7 +52,6 @@ func makeDataSourcePipelinePlanV200(
        plan coreModels.PipelinePlan,
        bpScopes []*coreModels.BlueprintScope,
        connectionId uint64,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, errors.Error) {
        for i, bpScope := range bpScopes {
                stage := plan[i]
@@ -68,10 +64,6 @@ func makeDataSourcePipelinePlanV200(
                options["scopeId"] = bpScope.ScopeId
                options["connectionId"] = connectionId
 
-               if syncPolicy.TimeAfter != nil {
-                       options["timeAfter"] = 
syncPolicy.TimeAfter.Format(time.RFC3339)
-               }
-
                // get scope config from db
                _, scopeConfig, err := 
scopeHelper.DbHelper().GetScopeAndConfig(connectionId, bpScope.ScopeId)
                if err != nil {
diff --git a/backend/plugins/jenkins/api/blueprint_v200_test.go 
b/backend/plugins/jenkins/api/blueprint_v200_test.go
index bfd61f573..561cc6c56 100644
--- a/backend/plugins/jenkins/api/blueprint_v200_test.go
+++ b/backend/plugins/jenkins/api/blueprint_v200_test.go
@@ -42,14 +42,14 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
        bs := &coreModels.BlueprintScope{
                ScopeId: "a/b/ccc",
        }
-       syncPolicy := &coreModels.SyncPolicy{}
+       
        bpScopes := make([]*coreModels.BlueprintScope, 0)
        bpScopes = append(bpScopes, bs)
 
        mockBasicRes(t)
 
        plan := make(coreModels.PipelinePlan, len(bpScopes))
-       plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes, 1, 
syncPolicy)
+       plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes, 1)
        assert.Nil(t, err)
        scopes, err := makeScopesV200(bpScopes, 1)
        assert.Nil(t, err)
diff --git a/backend/plugins/jenkins/impl/impl.go 
b/backend/plugins/jenkins/impl/impl.go
index 01970c5dc..d0807fd64 100644
--- a/backend/plugins/jenkins/impl/impl.go
+++ b/backend/plugins/jenkins/impl/impl.go
@@ -20,7 +20,6 @@ package impl
 import (
        "fmt"
        "strings"
-       "time"
 
        coreModels "github.com/apache/incubator-devlake/core/models"
 
@@ -135,13 +134,6 @@ func (p Jenkins) PrepareTaskData(taskCtx 
plugin.TaskContext, options map[string]
                return nil, err
        }
 
-       var timeAfter time.Time
-       if op.TimeAfter != "" {
-               timeAfter, err = errors.Convert01(time.Parse(time.RFC3339, 
op.TimeAfter))
-               if err != nil {
-                       return nil, errors.BadInput.Wrap(err, "invalid value 
for `timeAfter`")
-               }
-       }
        regexEnricher := helper.NewRegexEnricher()
        if err := regexEnricher.TryAdd(devops.DEPLOYMENT, 
op.ScopeConfig.DeploymentPattern); err != nil {
                return nil, errors.BadInput.Wrap(err, "invalid value for 
`deploymentPattern`")
@@ -155,10 +147,7 @@ func (p Jenkins) PrepareTaskData(taskCtx 
plugin.TaskContext, options map[string]
                Connection:    connection,
                RegexEnricher: regexEnricher,
        }
-       if !timeAfter.IsZero() {
-               taskData.TimeAfter = &timeAfter
-               logger.Debug("collect data created from %s", timeAfter)
-       }
+
        return taskData, nil
 }
 
@@ -173,9 +162,8 @@ func (p Jenkins) MigrationScripts() 
[]plugin.MigrationScript {
 func (p Jenkins) MakeDataSourcePipelinePlanV200(
        connectionId uint64,
        scopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
-       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes, syncPolicy)
+       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes)
 }
 
 func (p Jenkins) ApiResources() 
map[string]map[string]plugin.ApiResourceHandler {
diff --git a/backend/plugins/jenkins/jenkins.go 
b/backend/plugins/jenkins/jenkins.go
index 140e1167a..7b0c59523 100644
--- a/backend/plugins/jenkins/jenkins.go
+++ b/backend/plugins/jenkins/jenkins.go
@@ -30,14 +30,12 @@ func main() {
        connectionId := jenkinsCmd.Flags().Uint64P("connection", "c", 1, 
"jenkins connection id")
        jobFullName := jenkinsCmd.Flags().StringP("jobFullName", "j", "", 
"jenkins job full name")
        deployTagPattern := jenkinsCmd.Flags().String("deployTagPattern", 
"(?i)deploy", "deploy tag name")
-       timeAfter := jenkinsCmd.Flags().StringP("timeAfter", "a", "", "collect 
data that are created after specified time, ie 2006-05-06T07:08:09Z")
 
        jenkinsCmd.Run = func(cmd *cobra.Command, args []string) {
                runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
                        "connectionId":     *connectionId,
                        "jobFullName":      *jobFullName,
                        "deployTagPattern": *deployTagPattern,
-                       "timeAfter":        *timeAfter,
                })
        }
        runner.RunCmd(jenkinsCmd)
diff --git a/backend/plugins/jenkins/tasks/build_collector.go 
b/backend/plugins/jenkins/tasks/build_collector.go
index 290d3a3e6..56924f350 100644
--- a/backend/plugins/jenkins/tasks/build_collector.go
+++ b/backend/plugins/jenkins/tasks/build_collector.go
@@ -66,7 +66,6 @@ func CollectApiBuilds(taskCtx plugin.SubTaskContext) 
errors.Error {
                        Table: RAW_BUILD_TABLE,
                },
                ApiClient: data.ApiClient,
-               TimeAfter: data.TimeAfter,
                CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{
                        PageSize:    100,
                        Concurrency: 10,
diff --git a/backend/plugins/jenkins/tasks/stage_collector.go 
b/backend/plugins/jenkins/tasks/stage_collector.go
index e6c4d1b30..6c8478750 100644
--- a/backend/plugins/jenkins/tasks/stage_collector.go
+++ b/backend/plugins/jenkins/tasks/stage_collector.go
@@ -54,9 +54,9 @@ func CollectApiStages(taskCtx plugin.SubTaskContext) 
errors.Error {
                dal.Where(`tjb.connection_id = ? and tjb.job_path = ? and 
tjb.job_name = ? and tjb.class = ?`,
                        data.Options.ConnectionId, data.Options.JobPath, 
data.Options.JobName, "WorkflowRun"),
        }
-       timeAfter := data.TimeAfter
-       if timeAfter != nil {
-               clauses = append(clauses, dal.Where(`tjb.start_time >= ?`, 
timeAfter))
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
+       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+               clauses = append(clauses, dal.Where(`tjb.start_time >= ?`, 
syncPolicy.TimeAfter))
        }
 
        cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/jenkins/tasks/task_data.go 
b/backend/plugins/jenkins/tasks/task_data.go
index 3be060df7..d34b4e6ac 100644
--- a/backend/plugins/jenkins/tasks/task_data.go
+++ b/backend/plugins/jenkins/tasks/task_data.go
@@ -19,7 +19,6 @@ package tasks
 
 import (
        "strings"
-       "time"
 
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
@@ -30,11 +29,10 @@ type JenkinsApiParams models.JenkinsApiParams
 type JenkinsOptions struct {
        ConnectionId  uint64 `json:"connectionId"`
        ScopeId       string
-       ScopeConfigId uint64 `json:"scopeConfigId"`
-       JobFullName   string `json:"jobFullName"` // "path1/path2/job name"
-       JobName       string `json:"jobName"`     // "job name"
-       JobPath       string `json:"jobPath"`     // "job/path1/job/path2"
-       TimeAfter     string
+       ScopeConfigId uint64                     `json:"scopeConfigId"`
+       JobFullName   string                     `json:"jobFullName"` // 
"path1/path2/job name"
+       JobName       string                     `json:"jobName"`     // "job 
name"
+       JobPath       string                     `json:"jobPath"`     // 
"job/path1/job/path2"
        Tasks         []string                   `json:"tasks,omitempty"`
        ScopeConfig   *models.JenkinsScopeConfig `mapstructure:"scopeConfig" 
json:"scopeConfig"`
 }
@@ -43,7 +41,6 @@ type JenkinsTaskData struct {
        Options       *JenkinsOptions
        ApiClient     *api.ApiAsyncClient
        Connection    *models.JenkinsConnection
-       TimeAfter     *time.Time
        RegexEnricher *api.RegexEnricher
 }
 
diff --git a/backend/plugins/jira/api/blueprint_v200.go 
b/backend/plugins/jira/api/blueprint_v200.go
index 20be17970..d67b86b98 100644
--- a/backend/plugins/jira/api/blueprint_v200.go
+++ b/backend/plugins/jira/api/blueprint_v200.go
@@ -18,8 +18,6 @@ limitations under the License.
 package api
 
 import (
-       "time"
-
        "github.com/apache/incubator-devlake/core/errors"
        coreModels "github.com/apache/incubator-devlake/core/models"
        "github.com/apache/incubator-devlake/core/models/domainlayer"
@@ -35,10 +33,9 @@ func MakeDataSourcePipelinePlanV200(
        subtaskMetas []plugin.SubTaskMeta,
        connectionId uint64,
        bpScopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
        plan := make(coreModels.PipelinePlan, len(bpScopes))
-       plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connectionId, syncPolicy)
+       plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connectionId)
        if err != nil {
                return nil, nil, err
        }
@@ -55,7 +52,6 @@ func makeDataSourcePipelinePlanV200(
        plan coreModels.PipelinePlan,
        bpScopes []*coreModels.BlueprintScope,
        connectionId uint64,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, errors.Error) {
        for i, bpScope := range bpScopes {
                stage := plan[i]
@@ -66,9 +62,6 @@ func makeDataSourcePipelinePlanV200(
                options := make(map[string]interface{})
                options["scopeId"] = bpScope.ScopeId
                options["connectionId"] = connectionId
-               if syncPolicy.TimeAfter != nil {
-                       options["timeAfter"] = 
syncPolicy.TimeAfter.Format(time.RFC3339)
-               }
 
                // get scope config from db
                _, scopeConfig, err := 
scopeHelper.DbHelper().GetScopeAndConfig(connectionId, bpScope.ScopeId)
diff --git a/backend/plugins/jira/api/blueprint_v200_test.go 
b/backend/plugins/jira/api/blueprint_v200_test.go
index 76b9306f6..81c44186b 100644
--- a/backend/plugins/jira/api/blueprint_v200_test.go
+++ b/backend/plugins/jira/api/blueprint_v200_test.go
@@ -42,13 +42,13 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
        bs := &coreModels.BlueprintScope{
                ScopeId: "10",
        }
-       syncPolicy := &coreModels.SyncPolicy{}
+
        bpScopes := make([]*coreModels.BlueprintScope, 0)
        bpScopes = append(bpScopes, bs)
        plan := make(coreModels.PipelinePlan, len(bpScopes))
        mockBasicRes(t)
 
-       plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes, 
uint64(1), syncPolicy)
+       plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes, 
uint64(1))
        assert.Nil(t, err)
        scopes, err := makeScopesV200(bpScopes, uint64(1))
        assert.Nil(t, err)
diff --git a/backend/plugins/jira/impl/impl.go 
b/backend/plugins/jira/impl/impl.go
index 432a4142a..20e6a052f 100644
--- a/backend/plugins/jira/impl/impl.go
+++ b/backend/plugins/jira/impl/impl.go
@@ -20,7 +20,6 @@ package impl
 import (
        "fmt"
        "net/http"
-       "time"
 
        "github.com/apache/incubator-devlake/core/context"
        "github.com/apache/incubator-devlake/core/dal"
@@ -250,24 +249,15 @@ func (p Jira) PrepareTaskData(taskCtx plugin.TaskContext, 
options map[string]int
                ApiClient:      jiraApiClient,
                JiraServerInfo: *info,
        }
-       if op.TimeAfter != "" {
-               var timeAfter time.Time
-               timeAfter, err = errors.Convert01(time.Parse(time.RFC3339, 
op.TimeAfter))
-               if err != nil {
-                       return nil, errors.BadInput.Wrap(err, "invalid value 
for `timeAfter`")
-               }
-               taskData.TimeAfter = &timeAfter
-               logger.Debug("collect data created from %s", timeAfter)
-       }
+
        return taskData, nil
 }
 
 func (p Jira) MakeDataSourcePipelinePlanV200(
        connectionId uint64,
        scopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
-       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes, syncPolicy)
+       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes)
 }
 
 func (p Jira) RootPkgPath() string {
diff --git a/backend/plugins/jira/jira.go b/backend/plugins/jira/jira.go
index 208b4c82e..fdb2dfa61 100644
--- a/backend/plugins/jira/jira.go
+++ b/backend/plugins/jira/jira.go
@@ -32,12 +32,11 @@ func main() {
        boardId := cmd.Flags().Uint64P("board", "b", 0, "jira board id")
        _ = cmd.MarkFlagRequired("connection")
        _ = cmd.MarkFlagRequired("board")
-       timeAfter := cmd.Flags().StringP("timeAfter", "a", "", "collect data 
that are created after specified time, ie 2006-05-06T07:08:09Z")
+
        cmd.Run = func(c *cobra.Command, args []string) {
                runner.DirectRun(c, args, PluginEntry, map[string]interface{}{
                        "connectionId": *connectionId,
                        "boardId":      *boardId,
-                       "timeAfter":    *timeAfter,
                })
        }
        runner.RunCmd(cmd)
diff --git a/backend/plugins/jira/tasks/development_panel_collector.go 
b/backend/plugins/jira/tasks/development_panel_collector.go
index 44d65f627..278052cdc 100644
--- a/backend/plugins/jira/tasks/development_panel_collector.go
+++ b/backend/plugins/jira/tasks/development_panel_collector.go
@@ -61,7 +61,7 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext) 
errors.Error {
                },
 
                Table: RAW_DEVELOPMENT_PANEL,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
diff --git a/backend/plugins/jira/tasks/issue_changelog_collector.go 
b/backend/plugins/jira/tasks/issue_changelog_collector.go
index 99f95ffa6..7b5fc7674 100644
--- a/backend/plugins/jira/tasks/issue_changelog_collector.go
+++ b/backend/plugins/jira/tasks/issue_changelog_collector.go
@@ -60,7 +60,7 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        BoardId:      data.Options.BoardId,
                },
                Table: RAW_CHANGELOG_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
diff --git a/backend/plugins/jira/tasks/issue_collector.go 
b/backend/plugins/jira/tasks/issue_collector.go
index 19f9259c9..a6cf3b8ea 100644
--- a/backend/plugins/jira/tasks/issue_collector.go
+++ b/backend/plugins/jira/tasks/issue_collector.go
@@ -20,13 +20,14 @@ package tasks
 import (
        "encoding/json"
        "fmt"
-       "github.com/apache/incubator-devlake/core/dal"
-       "github.com/apache/incubator-devlake/plugins/jira/models"
        "io"
        "net/http"
        "net/url"
        "time"
 
+       "github.com/apache/incubator-devlake/core/dal"
+       "github.com/apache/incubator-devlake/plugins/jira/models"
+
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
@@ -61,7 +62,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
                        Table store raw data
                */
                Table: RAW_ISSUE_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
@@ -76,7 +77,14 @@ func CollectIssues(taskCtx plugin.SubTaskContext) 
errors.Error {
        } else {
                logger.Info("got user's timezone: %v", loc.String())
        }
-       jql := buildJQL(data.TimeAfter, 
collectorWithState.LatestState.LatestSuccessStart, incremental, loc)
+
+       jql := ""
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
+       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+               jql = buildJQL(syncPolicy.TimeAfter, 
collectorWithState.LatestState.LatestSuccessStart, incremental, loc)
+       } else {
+               jql = buildJQL(nil, 
collectorWithState.LatestState.LatestSuccessStart, incremental, loc)
+       }
 
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                ApiClient:   data.ApiClient,
diff --git a/backend/plugins/jira/tasks/issue_comment_collector.go 
b/backend/plugins/jira/tasks/issue_comment_collector.go
index c7879d4e2..fb0d2168c 100644
--- a/backend/plugins/jira/tasks/issue_comment_collector.go
+++ b/backend/plugins/jira/tasks/issue_comment_collector.go
@@ -60,7 +60,7 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext) 
errors.Error {
                        BoardId:      data.Options.BoardId,
                },
                Table: RAW_ISSUE_COMMENT_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
diff --git a/backend/plugins/jira/tasks/remotelink_collector.go 
b/backend/plugins/jira/tasks/remotelink_collector.go
index e3b141053..158640d5c 100644
--- a/backend/plugins/jira/tasks/remotelink_collector.go
+++ b/backend/plugins/jira/tasks/remotelink_collector.go
@@ -59,7 +59,7 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext) 
errors.Error {
                        BoardId:      data.Options.BoardId,
                },
                Table: RAW_REMOTELINK_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
diff --git a/backend/plugins/jira/tasks/task_data.go 
b/backend/plugins/jira/tasks/task_data.go
index 13c8f00c2..3a9b01d26 100644
--- a/backend/plugins/jira/tasks/task_data.go
+++ b/backend/plugins/jira/tasks/task_data.go
@@ -19,7 +19,6 @@ package tasks
 
 import (
        "fmt"
-       "time"
 
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
@@ -27,9 +26,8 @@ import (
 )
 
 type JiraOptions struct {
-       ConnectionId  uint64 `json:"connectionId"`
-       BoardId       uint64 `json:"boardId"`
-       TimeAfter     string
+       ConnectionId  uint64                  `json:"connectionId"`
+       BoardId       uint64                  `json:"boardId"`
        ScopeConfig   *models.JiraScopeConfig `json:"scopeConfig"`
        ScopeId       string
        ScopeConfigId uint64
@@ -39,7 +37,6 @@ type JiraOptions struct {
 type JiraTaskData struct {
        Options        *JiraOptions
        ApiClient      *api.ApiAsyncClient
-       TimeAfter      *time.Time
        JiraServerInfo models.JiraServerInfo
 }
 
diff --git a/backend/plugins/jira/tasks/worklog_collector.go 
b/backend/plugins/jira/tasks/worklog_collector.go
index bd536eb47..d0feab119 100644
--- a/backend/plugins/jira/tasks/worklog_collector.go
+++ b/backend/plugins/jira/tasks/worklog_collector.go
@@ -52,7 +52,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        BoardId:      data.Options.BoardId,
                },
                Table: RAW_WORKLOGS_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
diff --git a/backend/plugins/pagerduty/api/blueprint_v200.go 
b/backend/plugins/pagerduty/api/blueprint_v200.go
index c213e4755..a1c8042fe 100644
--- a/backend/plugins/pagerduty/api/blueprint_v200.go
+++ b/backend/plugins/pagerduty/api/blueprint_v200.go
@@ -18,8 +18,6 @@ limitations under the License.
 package api
 
 import (
-       "time"
-
        "github.com/apache/incubator-devlake/core/errors"
        coreModels "github.com/apache/incubator-devlake/core/models"
        "github.com/apache/incubator-devlake/core/models/domainlayer"
@@ -36,7 +34,6 @@ func MakeDataSourcePipelinePlanV200(
        subtaskMetas []plugin.SubTaskMeta,
        connectionId uint64,
        bpScopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
        // get the connection info for url
        connection := &models.PagerDutyConnection{}
@@ -46,7 +43,7 @@ func MakeDataSourcePipelinePlanV200(
        }
 
        plan := make(coreModels.PipelinePlan, len(bpScopes))
-       plan, err = makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connection, syncPolicy)
+       plan, err = makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connection)
        if err != nil {
                return nil, nil, err
        }
@@ -63,7 +60,6 @@ func makeDataSourcePipelinePlanV200(
        plan coreModels.PipelinePlan,
        bpScopes []*coreModels.BlueprintScope,
        connection *models.PagerDutyConnection,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, errors.Error) {
        for i, bpScope := range bpScopes {
                // get board and scope config from db
@@ -77,9 +73,7 @@ func makeDataSourcePipelinePlanV200(
                        ServiceId:    service.Id,
                        ServiceName:  service.Name,
                }
-               if syncPolicy.TimeAfter != nil {
-                       op.TimeAfter = syncPolicy.TimeAfter.Format(time.RFC3339)
-               }
+
                var options map[string]any
                options, err = tasks.EncodeTaskOptions(op)
                if err != nil {
diff --git a/backend/plugins/pagerduty/impl/impl.go 
b/backend/plugins/pagerduty/impl/impl.go
index 14737971d..273e578b3 100644
--- a/backend/plugins/pagerduty/impl/impl.go
+++ b/backend/plugins/pagerduty/impl/impl.go
@@ -19,7 +19,6 @@ package impl
 
 import (
        "fmt"
-       "time"
 
        "github.com/apache/incubator-devlake/core/context"
        "github.com/apache/incubator-devlake/core/dal"
@@ -108,14 +107,7 @@ func (p PagerDuty) PrepareTaskData(taskCtx 
plugin.TaskContext, options map[strin
        if err != nil {
                return nil, errors.Default.Wrap(err, "unable to get Pagerduty 
connection by the given connection ID")
        }
-       var timeAfter *time.Time
-       if op.TimeAfter != "" {
-               convertedTime, err := errors.Convert01(time.Parse(time.RFC3339, 
op.TimeAfter))
-               if err != nil {
-                       return nil, errors.BadInput.Wrap(err, 
fmt.Sprintf("invalid value for `timeAfter`: %s", timeAfter))
-               }
-               timeAfter = &convertedTime
-       }
+
        client, err := helper.NewApiClient(taskCtx.GetContext(), 
connection.Endpoint, map[string]string{
                "Authorization": fmt.Sprintf("Token %s", connection.Token),
        }, 0, connection.Proxy, taskCtx)
@@ -127,9 +119,8 @@ func (p PagerDuty) PrepareTaskData(taskCtx 
plugin.TaskContext, options map[strin
                return nil, err
        }
        return &tasks.PagerDutyTaskData{
-               Options:   op,
-               TimeAfter: timeAfter,
-               Client:    asyncClient,
+               Options: op,
+               Client:  asyncClient,
        }, nil
 }
 
@@ -177,9 +168,8 @@ func (p PagerDuty) ApiResources() 
map[string]map[string]plugin.ApiResourceHandle
 func (p PagerDuty) MakeDataSourcePipelinePlanV200(
        connectionId uint64,
        scopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
-       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes, syncPolicy)
+       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes)
 }
 
 func (p PagerDuty) Close(taskCtx plugin.TaskContext) errors.Error {
diff --git a/backend/plugins/pagerduty/tasks/incidents_collector.go 
b/backend/plugins/pagerduty/tasks/incidents_collector.go
index d8cae2f1f..91068bf5b 100644
--- a/backend/plugins/pagerduty/tasks/incidents_collector.go
+++ b/backend/plugins/pagerduty/tasks/incidents_collector.go
@@ -69,7 +69,6 @@ func CollectIncidents(taskCtx plugin.SubTaskContext) 
errors.Error {
        collector, err := 
api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
                RawDataSubTaskArgs: args,
                ApiClient:          data.Client,
-               TimeAfter:          data.TimeAfter,
                CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
                        PageSize: 100,
                        GetNextPageCustomData: func(prevReqData 
*api.RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) {
diff --git a/backend/plugins/pagerduty/tasks/task_data.go 
b/backend/plugins/pagerduty/tasks/task_data.go
index 894628073..5f456f725 100644
--- a/backend/plugins/pagerduty/tasks/task_data.go
+++ b/backend/plugins/pagerduty/tasks/task_data.go
@@ -18,8 +18,6 @@ limitations under the License.
 package tasks
 
 import (
-       "time"
-
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/pagerduty/models"
@@ -27,7 +25,6 @@ import (
 
 type PagerDutyOptions struct {
        ConnectionId uint64   `json:"connectionId"`
-       TimeAfter    string   `json:"time_after,omitempty"`
        ServiceId    string   `json:"service_id,omitempty"`
        ServiceName  string   `json:"service_name,omitempty"`
        Tasks        []string `json:"tasks,omitempty"`
@@ -35,9 +32,8 @@ type PagerDutyOptions struct {
 }
 
 type PagerDutyTaskData struct {
-       Options   *PagerDutyOptions
-       TimeAfter *time.Time
-       Client    api.RateLimitedApiClient
+       Options *PagerDutyOptions
+       Client  api.RateLimitedApiClient
 }
 
 func (p *PagerDutyOptions) GetParams() any {
diff --git a/backend/plugins/sonarqube/api/blueprint_v200.go 
b/backend/plugins/sonarqube/api/blueprint_v200.go
index 95f886c20..b1148e530 100644
--- a/backend/plugins/sonarqube/api/blueprint_v200.go
+++ b/backend/plugins/sonarqube/api/blueprint_v200.go
@@ -38,10 +38,9 @@ func MakeDataSourcePipelinePlanV200(
        subtaskMetas []plugin.SubTaskMeta,
        connectionId uint64,
        bpScopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
        plan := make(coreModels.PipelinePlan, len(bpScopes))
-       plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connectionId, syncPolicy)
+       plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connectionId)
        if err != nil {
                return nil, nil, err
        }
@@ -58,7 +57,6 @@ func makeDataSourcePipelinePlanV200(
        plan coreModels.PipelinePlan,
        bpScopes []*coreModels.BlueprintScope,
        connectionId uint64,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, errors.Error) {
        for i, bpScope := range bpScopes {
                stage := plan[i]
diff --git a/backend/plugins/sonarqube/api/blueprint_v200_test.go 
b/backend/plugins/sonarqube/api/blueprint_v200_test.go
index fd1937119..093355e8a 100644
--- a/backend/plugins/sonarqube/api/blueprint_v200_test.go
+++ b/backend/plugins/sonarqube/api/blueprint_v200_test.go
@@ -41,11 +41,11 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
        bs := &coreModels.BlueprintScope{
                ScopeId: "f5a50c63-2e8f-4107-9014-853f6f467757",
        }
-       syncPolicy := &coreModels.SyncPolicy{}
+
        bpScopes := make([]*coreModels.BlueprintScope, 0)
        bpScopes = append(bpScopes, bs)
        plan := make(coreModels.PipelinePlan, len(bpScopes))
-       plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes, 
uint64(1), syncPolicy)
+       plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes, 
uint64(1))
        assert.Nil(t, err)
        basicRes = NewMockBasicRes()
 
diff --git a/backend/plugins/sonarqube/impl/impl.go 
b/backend/plugins/sonarqube/impl/impl.go
index ac2743104..28496b180 100644
--- a/backend/plugins/sonarqube/impl/impl.go
+++ b/backend/plugins/sonarqube/impl/impl.go
@@ -200,9 +200,8 @@ func (p Sonarqube) ApiResources() 
map[string]map[string]plugin.ApiResourceHandle
 func (p Sonarqube) MakeDataSourcePipelinePlanV200(
        connectionId uint64,
        scopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
-       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes, syncPolicy)
+       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes)
 }
 
 func (p Sonarqube) Close(taskCtx plugin.TaskContext) errors.Error {
diff --git a/backend/plugins/tapd/api/blueprint_v200.go 
b/backend/plugins/tapd/api/blueprint_v200.go
index 41aa2cfa4..8ab9cb0cb 100644
--- a/backend/plugins/tapd/api/blueprint_v200.go
+++ b/backend/plugins/tapd/api/blueprint_v200.go
@@ -19,7 +19,6 @@ package api
 
 import (
        "strconv"
-       "time"
 
        "github.com/apache/incubator-devlake/core/errors"
        coreModels "github.com/apache/incubator-devlake/core/models"
@@ -36,10 +35,9 @@ func MakeDataSourcePipelinePlanV200(
        subtaskMetas []plugin.SubTaskMeta,
        connectionId uint64,
        bpScopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
        plan := make(coreModels.PipelinePlan, len(bpScopes))
-       plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connectionId, syncPolicy)
+       plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connectionId)
        if err != nil {
                return nil, nil, err
        }
@@ -56,7 +54,6 @@ func makeDataSourcePipelinePlanV200(
        plan coreModels.PipelinePlan,
        bpScopes []*coreModels.BlueprintScope,
        connectionId uint64,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, errors.Error) {
        for i, bpScope := range bpScopes {
                stage := plan[i]
@@ -71,9 +68,6 @@ func makeDataSourcePipelinePlanV200(
                }
                options["workspaceId"] = intNum
                options["connectionId"] = connectionId
-               if syncPolicy.TimeAfter != nil {
-                       options["timeAfter"] = 
syncPolicy.TimeAfter.Format(time.RFC3339)
-               }
 
                _, scopeConfig, err := 
scopeHelper.DbHelper().GetScopeAndConfig(connectionId, bpScope.ScopeId)
                if err != nil {
diff --git a/backend/plugins/tapd/api/blueprint_v200_test.go 
b/backend/plugins/tapd/api/blueprint_v200_test.go
index dac1d9a43..bb3a17eec 100644
--- a/backend/plugins/tapd/api/blueprint_v200_test.go
+++ b/backend/plugins/tapd/api/blueprint_v200_test.go
@@ -42,13 +42,12 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
        bs := &coreModels.BlueprintScope{
                ScopeId: "10",
        }
-       syncPolicy := &coreModels.SyncPolicy{}
        bpScopes := make([]*coreModels.BlueprintScope, 0)
        bpScopes = append(bpScopes, bs)
        plan := make(coreModels.PipelinePlan, len(bpScopes))
        mockBasicRes(t)
 
-       plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes, 
uint64(1), syncPolicy)
+       plan, err = makeDataSourcePipelinePlanV200(nil, plan, bpScopes, 
uint64(1))
        assert.Nil(t, err)
        scopes, err := makeScopesV200(bpScopes, uint64(1))
        assert.Nil(t, err)
diff --git a/backend/plugins/tapd/impl/impl.go 
b/backend/plugins/tapd/impl/impl.go
index f72a81a4c..04ec1d62d 100644
--- a/backend/plugins/tapd/impl/impl.go
+++ b/backend/plugins/tapd/impl/impl.go
@@ -255,24 +255,14 @@ func (p Tapd) PrepareTaskData(taskCtx plugin.TaskContext, 
options map[string]int
                ApiClient:  tapdApiClient,
                Connection: connection,
        }
-       if op.TimeAfter != "" {
-               var timeAfter time.Time
-               timeAfter, err = errors.Convert01(time.Parse(time.RFC3339, 
op.TimeAfter))
-               if err != nil {
-                       return nil, errors.BadInput.Wrap(err, "invalid value 
for `timeAfter`")
-               }
-               taskData.TimeAfter = &timeAfter
-               logger.Debug("collect data updated timeAfter %s", timeAfter)
-       }
        return taskData, nil
 }
 
 func (p Tapd) MakeDataSourcePipelinePlanV200(
        connectionId uint64,
        scopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
-       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes, syncPolicy)
+       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes)
 }
 
 func (p Tapd) RootPkgPath() string {
diff --git a/backend/plugins/tapd/tasks/bug_changelog_collector.go 
b/backend/plugins/tapd/tasks/bug_changelog_collector.go
index 32992091c..b967cfa78 100644
--- a/backend/plugins/tapd/tasks/bug_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/bug_changelog_collector.go
@@ -34,11 +34,12 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_BUG_CHANGELOG_TABLE)
        logger := taskCtx.GetLogger()
        logger.Info("collect storyChangelogs")
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
        incremental := collectorWithState.IsIncremental()
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
 
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                Incremental: incremental,
@@ -51,10 +52,10 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created desc")
-                       if data.TimeAfter != nil {
+                       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
                                query.Set("created",
                                        fmt.Sprintf(">%s",
-                                               
data.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+                                               
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        if incremental {
                                query.Set("created",
diff --git a/backend/plugins/tapd/tasks/bug_collector.go 
b/backend/plugins/tapd/tasks/bug_collector.go
index 530c5ff04..3265d320f 100644
--- a/backend/plugins/tapd/tasks/bug_collector.go
+++ b/backend/plugins/tapd/tasks/bug_collector.go
@@ -19,10 +19,11 @@ package tasks
 
 import (
        "fmt"
+       "net/url"
+
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-       "net/url"
 )
 
 const RAW_BUG_TABLE = "tapd_api_bugs"
@@ -33,12 +34,12 @@ func CollectBugs(taskCtx plugin.SubTaskContext) 
errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_BUG_TABLE)
        logger := taskCtx.GetLogger()
        logger.Info("collect bugs")
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
        incremental := collectorWithState.IsIncremental()
-
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                Incremental: incremental,
                ApiClient:   data.ApiClient,
@@ -51,10 +52,10 @@ func CollectBugs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("fields", "labels")
                        query.Set("order", "created asc")
-                       if data.TimeAfter != nil {
+                       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
                                query.Set("modified",
                                        fmt.Sprintf(">%s",
-                                               
data.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+                                               
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        if incremental {
                                query.Set("modified",
diff --git a/backend/plugins/tapd/tasks/bug_commit_collector.go 
b/backend/plugins/tapd/tasks/bug_commit_collector.go
index 26a8e1e11..3209b6bf2 100644
--- a/backend/plugins/tapd/tasks/bug_commit_collector.go
+++ b/backend/plugins/tapd/tasks/bug_commit_collector.go
@@ -20,14 +20,15 @@ package tasks
 import (
        "encoding/json"
        "fmt"
+       "net/http"
+       "net/url"
+       "reflect"
+
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/tapd/models"
-       "net/http"
-       "net/url"
-       "reflect"
 )
 
 const RAW_BUG_COMMIT_TABLE = "tapd_api_bug_commits"
@@ -37,20 +38,21 @@ var _ plugin.SubTaskEntryPoint = CollectBugCommits
 func CollectBugCommits(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_BUG_COMMIT_TABLE)
        db := taskCtx.GetDal()
-       collectorWithState, err := 
api.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect issueCommits")
        incremental := collectorWithState.IsIncremental()
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        clauses := []dal.Clause{
                dal.Select("_tool_tapd_bugs.id as issue_id, modified as 
update_time"),
                dal.From(&models.TapdBug{}),
                dal.Where("_tool_tapd_bugs.connection_id = ? and 
_tool_tapd_bugs.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if collectorWithState.TimeAfter != nil {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.TimeAfter))
+       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+               clauses = append(clauses, dal.Where("modified > ?", 
*syncPolicy.TimeAfter))
        }
        if incremental {
                clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
diff --git a/backend/plugins/tapd/tasks/iteration_collector.go 
b/backend/plugins/tapd/tasks/iteration_collector.go
index 7e18030f9..fe21fbc4f 100644
--- a/backend/plugins/tapd/tasks/iteration_collector.go
+++ b/backend/plugins/tapd/tasks/iteration_collector.go
@@ -36,11 +36,12 @@ func CollectIterations(taskCtx plugin.SubTaskContext) 
errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_ITERATION_TABLE)
        logger := taskCtx.GetLogger()
        logger.Info("collect iterations")
-       collectorWithState, err := 
api.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
        incremental := collectorWithState.IsIncremental()
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        err = collectorWithState.InitCollector(api.ApiCollectorArgs{
                Incremental: incremental,
                ApiClient:   data.ApiClient,
@@ -53,10 +54,10 @@ func CollectIterations(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if data.TimeAfter != nil {
+                       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
                                query.Set("modified",
                                        fmt.Sprintf(">%s",
-                                               
data.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+                                               
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        if incremental {
                                query.Set("modified",
diff --git a/backend/plugins/tapd/tasks/story_bug_collector.go 
b/backend/plugins/tapd/tasks/story_bug_collector.go
index 0115399f5..cfb27fa69 100644
--- a/backend/plugins/tapd/tasks/story_bug_collector.go
+++ b/backend/plugins/tapd/tasks/story_bug_collector.go
@@ -19,13 +19,14 @@ package tasks
 
 import (
        "fmt"
+       "net/url"
+       "reflect"
+
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/tapd/models"
-       "net/url"
-       "reflect"
 )
 
 const RAW_STORY_BUG_TABLE = "tapd_api_story_bugs"
@@ -35,20 +36,21 @@ var _ plugin.SubTaskEntryPoint = CollectStoryBugs
 func CollectStoryBugs(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_STORY_BUG_TABLE)
        db := taskCtx.GetDal()
-       collectorWithState, err := 
api.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect storyBugs")
        incremental := collectorWithState.IsIncremental()
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        clauses := []dal.Clause{
                dal.Select("id as issue_id, modified as update_time"),
                dal.From(&models.TapdStory{}),
                dal.Where("_tool_tapd_stories.connection_id = ? and 
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if collectorWithState.TimeAfter != nil {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.TimeAfter))
+       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+               clauses = append(clauses, dal.Where("modified > ?", 
*syncPolicy.TimeAfter))
        }
        if incremental {
                clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
diff --git a/backend/plugins/tapd/tasks/story_changelog_collector.go 
b/backend/plugins/tapd/tasks/story_changelog_collector.go
index c7ef61407..a0bad43bd 100644
--- a/backend/plugins/tapd/tasks/story_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/story_changelog_collector.go
@@ -32,13 +32,14 @@ var _ plugin.SubTaskEntryPoint = CollectStoryChangelogs
 
 func CollectStoryChangelogs(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_STORY_CHANGELOG_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect storyChangelogs")
        incremental := collectorWithState.IsIncremental()
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
 
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                Incremental: incremental,
@@ -51,10 +52,10 @@ func CollectStoryChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if data.TimeAfter != nil {
+                       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
                                query.Set("created",
                                        fmt.Sprintf(">%s",
-                                               
data.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+                                               
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        if incremental {
                                query.Set("created",
diff --git a/backend/plugins/tapd/tasks/story_collector.go 
b/backend/plugins/tapd/tasks/story_collector.go
index e6df82388..e88dd87ac 100644
--- a/backend/plugins/tapd/tasks/story_collector.go
+++ b/backend/plugins/tapd/tasks/story_collector.go
@@ -34,11 +34,12 @@ func CollectStorys(taskCtx plugin.SubTaskContext) 
errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_STORY_TABLE)
        logger := taskCtx.GetLogger()
        logger.Info("collect stories")
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
        incremental := collectorWithState.IsIncremental()
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                Incremental: incremental,
                ApiClient:   data.ApiClient,
@@ -51,10 +52,10 @@ func CollectStorys(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("fields", "labels")
                        query.Set("order", "created asc")
-                       if data.TimeAfter != nil {
+                       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
                                query.Set("modified",
                                        fmt.Sprintf(">%s",
-                                               
data.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+                                               
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        if incremental {
                                query.Set("modified",
diff --git a/backend/plugins/tapd/tasks/story_commit_collector.go 
b/backend/plugins/tapd/tasks/story_commit_collector.go
index 1b0f63505..8e2ad026f 100644
--- a/backend/plugins/tapd/tasks/story_commit_collector.go
+++ b/backend/plugins/tapd/tasks/story_commit_collector.go
@@ -20,14 +20,15 @@ package tasks
 import (
        "encoding/json"
        "fmt"
+       "net/http"
+       "net/url"
+       "reflect"
+
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/tapd/models"
-       "net/http"
-       "net/url"
-       "reflect"
 )
 
 const RAW_STORY_COMMIT_TABLE = "tapd_api_story_commits"
@@ -37,20 +38,21 @@ var _ plugin.SubTaskEntryPoint = CollectStoryCommits
 func CollectStoryCommits(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_STORY_COMMIT_TABLE)
        db := taskCtx.GetDal()
-       collectorWithState, err := 
api.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect issueCommits")
        incremental := collectorWithState.IsIncremental()
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        clauses := []dal.Clause{
                dal.Select("_tool_tapd_stories.id as issue_id, modified as 
update_time"),
                dal.From(&models.TapdStory{}),
                dal.Where("_tool_tapd_stories.connection_id = ? and 
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if collectorWithState.TimeAfter != nil {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.TimeAfter))
+       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+               clauses = append(clauses, dal.Where("modified > ?", 
*syncPolicy.TimeAfter))
        }
        if incremental {
                clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
diff --git a/backend/plugins/tapd/tasks/task_changelog_collector.go 
b/backend/plugins/tapd/tasks/task_changelog_collector.go
index 7f52ea28c..9a457c7bd 100644
--- a/backend/plugins/tapd/tasks/task_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/task_changelog_collector.go
@@ -32,14 +32,14 @@ var _ plugin.SubTaskEntryPoint = CollectTaskChangelogs
 
 func CollectTaskChangelogs(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_TASK_CHANGELOG_TABLE)
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect taskChangelogs")
        incremental := collectorWithState.IsIncremental()
-
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                Incremental: incremental,
                ApiClient:   data.ApiClient,
@@ -51,10 +51,10 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if data.TimeAfter != nil {
+                       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
                                query.Set("created",
                                        fmt.Sprintf(">%s",
-                                               
data.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+                                               
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        if incremental {
                                query.Set("created",
diff --git a/backend/plugins/tapd/tasks/task_collector.go 
b/backend/plugins/tapd/tasks/task_collector.go
index f0da6d10b..36d505e5c 100644
--- a/backend/plugins/tapd/tasks/task_collector.go
+++ b/backend/plugins/tapd/tasks/task_collector.go
@@ -34,12 +34,12 @@ func CollectTasks(taskCtx plugin.SubTaskContext) 
errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_TASK_TABLE)
        logger := taskCtx.GetLogger()
        logger.Info("collect tasks")
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
        incremental := collectorWithState.IsIncremental()
-
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
                RawDataSubTaskArgs: *rawDataSubTaskArgs,
                Incremental:        incremental,
@@ -53,10 +53,10 @@ func CollectTasks(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("fields", "labels")
                        query.Set("order", "created asc")
-                       if data.TimeAfter != nil {
+                       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
                                query.Set("modified",
                                        fmt.Sprintf(">%s",
-                                               
data.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+                                               
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        if incremental {
                                query.Set("modified",
diff --git a/backend/plugins/tapd/tasks/task_commit_collector.go 
b/backend/plugins/tapd/tasks/task_commit_collector.go
index 1a73a870c..8eae8e001 100644
--- a/backend/plugins/tapd/tasks/task_commit_collector.go
+++ b/backend/plugins/tapd/tasks/task_commit_collector.go
@@ -20,14 +20,15 @@ package tasks
 import (
        "encoding/json"
        "fmt"
+       "net/http"
+       "net/url"
+       "reflect"
+
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/tapd/models"
-       "net/http"
-       "net/url"
-       "reflect"
 )
 
 const RAW_TASK_COMMIT_TABLE = "tapd_api_task_commits"
@@ -37,20 +38,21 @@ var _ plugin.SubTaskEntryPoint = CollectTaskCommits
 func CollectTaskCommits(taskCtx plugin.SubTaskContext) errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_TASK_COMMIT_TABLE)
        db := taskCtx.GetDal()
-       collectorWithState, err := 
api.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
        logger := taskCtx.GetLogger()
        logger.Info("collect issueCommits")
        incremental := collectorWithState.IsIncremental()
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        clauses := []dal.Clause{
                dal.Select("_tool_tapd_tasks.id as issue_id, modified as 
update_time"),
                dal.From(&models.TapdTask{}),
                dal.Where("_tool_tapd_tasks.connection_id = ? and 
_tool_tapd_tasks.workspace_id = ? ", data.Options.ConnectionId, 
data.Options.WorkspaceId),
        }
-       if collectorWithState.TimeAfter != nil {
-               clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.TimeAfter))
+       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
+               clauses = append(clauses, dal.Where("modified > ?", 
*syncPolicy.TimeAfter))
        }
        if incremental {
                clauses = append(clauses, dal.Where("modified > ?", 
*collectorWithState.LatestState.LatestSuccessStart))
diff --git a/backend/plugins/tapd/tasks/task_data.go 
b/backend/plugins/tapd/tasks/task_data.go
index fc892f0fe..16b077bbb 100644
--- a/backend/plugins/tapd/tasks/task_data.go
+++ b/backend/plugins/tapd/tasks/task_data.go
@@ -29,7 +29,6 @@ type TapdOptions struct {
        ConnectionId  uint64 `mapstruct:"connectionId"`
        WorkspaceId   uint64 `mapstruct:"workspaceId"`
        PageSize      uint64 `mapstruct:"pageSize"`
-       TimeAfter     string `json:"timeAfter" 
mapstructure:"timeAfter,omitempty"`
        CstZone       *time.Location
        ScopeConfigId uint64
        ScopeConfig   *models.TapdScopeConfig `json:"scopeConfig"`
@@ -38,7 +37,6 @@ type TapdOptions struct {
 type TapdTaskData struct {
        Options    *TapdOptions
        ApiClient  *helper.ApiAsyncClient
-       TimeAfter  *time.Time
        Connection *models.TapdConnection
 }
 
diff --git a/backend/plugins/tapd/tasks/worklog_collector.go 
b/backend/plugins/tapd/tasks/worklog_collector.go
index 076b682fb..845fbe161 100644
--- a/backend/plugins/tapd/tasks/worklog_collector.go
+++ b/backend/plugins/tapd/tasks/worklog_collector.go
@@ -34,12 +34,12 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
        rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, 
RAW_WORKLOG_TABLE)
        logger := taskCtx.GetLogger()
        logger.Info("collect worklogs")
-       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs, data.TimeAfter)
+       collectorWithState, err := 
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
        if err != nil {
                return err
        }
        incremental := collectorWithState.IsIncremental()
-
+       syncPolicy := taskCtx.TaskContext().SyncPolicy()
        err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
                Incremental: incremental,
                ApiClient:   data.ApiClient,
@@ -51,10 +51,10 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext) 
errors.Error {
                        query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
                        query.Set("limit", fmt.Sprintf("%v", 
reqData.Pager.Size))
                        query.Set("order", "created asc")
-                       if data.TimeAfter != nil {
+                       if syncPolicy != nil && syncPolicy.TimeAfter != nil {
                                query.Set("modified",
                                        fmt.Sprintf(">%s",
-                                               
data.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
+                                               
syncPolicy.TimeAfter.In(data.Options.CstZone).Format("2006-01-02")))
                        }
                        if incremental {
                                query.Set("modified",
diff --git a/backend/plugins/teambition/api/blueprint200.go 
b/backend/plugins/teambition/api/blueprint200.go
index 6ee599725..4c5611b7c 100644
--- a/backend/plugins/teambition/api/blueprint200.go
+++ b/backend/plugins/teambition/api/blueprint200.go
@@ -19,7 +19,6 @@ package api
 
 import (
        "fmt"
-       "time"
 
        "github.com/apache/incubator-devlake/plugins/teambition/models"
 
@@ -37,10 +36,9 @@ func MakeDataSourcePipelinePlanV200(
        subtaskMetas []plugin.SubTaskMeta,
        connectionId uint64,
        bpScopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
        plan := make(coreModels.PipelinePlan, len(bpScopes))
-       plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connectionId, syncPolicy)
+       plan, err := makeDataSourcePipelinePlanV200(subtaskMetas, plan, 
bpScopes, connectionId)
        if err != nil {
                return nil, nil, err
        }
@@ -57,7 +55,6 @@ func makeDataSourcePipelinePlanV200(
        plan coreModels.PipelinePlan,
        bpScopes []*coreModels.BlueprintScope,
        connectionId uint64,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, errors.Error) {
        for i, bpScope := range bpScopes {
                stage := plan[i]
@@ -68,9 +65,6 @@ func makeDataSourcePipelinePlanV200(
                options := make(map[string]interface{})
                options["projectId"] = bpScope.ScopeId
                options["connectionId"] = connectionId
-               if syncPolicy.TimeAfter != nil {
-                       options["timeAfter"] = 
syncPolicy.TimeAfter.Format(time.RFC3339)
-               }
 
                subtasks, err := helper.MakePipelinePlanSubtasks(subtaskMetas, 
plugin.DOMAIN_TYPES)
                if err != nil {
diff --git a/backend/plugins/teambition/impl/impl.go 
b/backend/plugins/teambition/impl/impl.go
index 2aeb2f11a..410dfd173 100644
--- a/backend/plugins/teambition/impl/impl.go
+++ b/backend/plugins/teambition/impl/impl.go
@@ -19,7 +19,6 @@ package impl
 
 import (
        "fmt"
-       "time"
 
        "github.com/apache/incubator-devlake/core/context"
        "github.com/apache/incubator-devlake/core/dal"
@@ -122,9 +121,8 @@ func (p Teambition) SubTaskMetas() []plugin.SubTaskMeta {
 func (p Teambition) MakeDataSourcePipelinePlanV200(
        connectionId uint64,
        scopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
-       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes, syncPolicy)
+       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes)
 }
 
 func (p Teambition) PrepareTaskData(taskCtx plugin.TaskContext, options 
map[string]interface{}) (interface{}, errors.Error) {
@@ -152,16 +150,7 @@ func (p Teambition) PrepareTaskData(taskCtx 
plugin.TaskContext, options map[stri
                ApiClient: apiClient,
                TenantId:  connection.TenantId,
        }
-       var createdDateAfter time.Time
-       if op.TimeAfter != "" {
-               createdDateAfter, err = 
errors.Convert01(time.Parse(time.RFC3339, op.TimeAfter))
-               if err != nil {
-                       return nil, errors.BadInput.Wrap(err, "invalid value 
for `createdDateAfter`")
-               }
-       }
-       if !createdDateAfter.IsZero() {
-               taskData.TimeAfter = &createdDateAfter
-       }
+
        return taskData, nil
 }
 
diff --git a/backend/plugins/teambition/tasks/task_data.go 
b/backend/plugins/teambition/tasks/task_data.go
index 6f4f30747..961f1f33d 100644
--- a/backend/plugins/teambition/tasks/task_data.go
+++ b/backend/plugins/teambition/tasks/task_data.go
@@ -18,16 +18,16 @@ limitations under the License.
 package tasks
 
 import (
+       "time"
+
        "github.com/apache/incubator-devlake/core/errors"
        helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-       "time"
 )
 
 type TeambitionOptions struct {
        ConnectionId        uint64 `json:"connectionId"`
        ProjectId           string `json:"projectId"`
        PageSize            uint64 `mapstruct:"pageSize"`
-       TimeAfter           string `json:"timeAfter" 
mapstructure:"timeAfter,omitempty"`
        CstZone             *time.Location
        TransformationRules TransformationRules `json:"transformationRules"`
 }
@@ -35,7 +35,6 @@ type TeambitionOptions struct {
 type TeambitionTaskData struct {
        Options   *TeambitionOptions
        ApiClient *helper.ApiAsyncClient
-       TimeAfter *time.Time
        TenantId  string
 }
 
diff --git a/backend/plugins/trello/api/blueprint_v200.go 
b/backend/plugins/trello/api/blueprint_v200.go
index 4329e37df..836e5cdd3 100644
--- a/backend/plugins/trello/api/blueprint_v200.go
+++ b/backend/plugins/trello/api/blueprint_v200.go
@@ -18,8 +18,6 @@ limitations under the License.
 package api
 
 import (
-       "time"
-
        "github.com/apache/incubator-devlake/core/models/domainlayer"
        "github.com/apache/incubator-devlake/core/models/domainlayer/ticket"
 
@@ -38,7 +36,6 @@ func MakePipelinePlanV200(
        subtaskMetas []plugin.SubTaskMeta,
        connectionId uint64,
        scope []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
        scopes, err := makeScopeV200(connectionId, scope)
        if err != nil {
@@ -46,7 +43,7 @@ func MakePipelinePlanV200(
        }
 
        plan := make(coreModels.PipelinePlan, len(scope))
-       plan, err = makePipelinePlanV200(subtaskMetas, plan, scope, 
connectionId, syncPolicy)
+       plan, err = makePipelinePlanV200(subtaskMetas, plan, scope, 
connectionId)
        if err != nil {
                return nil, nil, err
        }
@@ -81,7 +78,7 @@ func makePipelinePlanV200(
        subtaskMetas []plugin.SubTaskMeta,
        plan coreModels.PipelinePlan,
        scopes []*coreModels.BlueprintScope,
-       connectionId uint64, syncPolicy *coreModels.SyncPolicy,
+       connectionId uint64,
 ) (coreModels.PipelinePlan, errors.Error) {
        for i, scope := range scopes {
                stage := plan[i]
@@ -93,9 +90,6 @@ func makePipelinePlanV200(
                options := make(map[string]interface{})
                options["connectionId"] = connectionId
                options["scopeId"] = scope.ScopeId
-               if syncPolicy.TimeAfter != nil {
-                       options["timeAfter"] = 
syncPolicy.TimeAfter.Format(time.RFC3339)
-               }
 
                _, scopeConfig, err := 
scopeHelper.DbHelper().GetScopeAndConfig(connectionId, scope.ScopeId)
                if err != nil {
diff --git a/backend/plugins/trello/impl/impl.go 
b/backend/plugins/trello/impl/impl.go
index 39b177a0a..2b2edaaea 100644
--- a/backend/plugins/trello/impl/impl.go
+++ b/backend/plugins/trello/impl/impl.go
@@ -195,9 +195,8 @@ func (p Trello) ApiResources() 
map[string]map[string]plugin.ApiResourceHandler {
 func (p Trello) MakeDataSourcePipelinePlanV200(
        connectionId uint64,
        scopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
-       return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes, 
syncPolicy)
+       return api.MakePipelinePlanV200(p.SubTaskMetas(), connectionId, scopes)
 }
 
 func (p Trello) Close(taskCtx plugin.TaskContext) errors.Error {
diff --git a/backend/plugins/zentao/api/blueprint_V200_test.go 
b/backend/plugins/zentao/api/blueprint_V200_test.go
index b379b7e3d..2cd62a269 100644
--- a/backend/plugins/zentao/api/blueprint_V200_test.go
+++ b/backend/plugins/zentao/api/blueprint_V200_test.go
@@ -71,10 +71,9 @@ func TestMakeDataSourcePipelinePlanV200(t *testing.T) {
        }*/
        bpScopes := make([]*coreModels.BlueprintScope, 0)
        bpScopes = append(bpScopes, bs)
-       syncPolicy := &coreModels.SyncPolicy{}
 
        plan := make(coreModels.PipelinePlan, len(bpScopes))
-       plan, scopes, err := makePipelinePlanV200(nil, plan, bpScopes, 
connection, syncPolicy)
+       plan, scopes, err := makePipelinePlanV200(nil, plan, bpScopes, 
connection)
        assert.Nil(t, err)
 
        expectPlan := coreModels.PipelinePlan{
diff --git a/backend/plugins/zentao/api/blueprint_v200.go 
b/backend/plugins/zentao/api/blueprint_v200.go
index b316cff43..d86105021 100644
--- a/backend/plugins/zentao/api/blueprint_v200.go
+++ b/backend/plugins/zentao/api/blueprint_v200.go
@@ -18,8 +18,6 @@ limitations under the License.
 package api
 
 import (
-       "time"
-
        "github.com/apache/incubator-devlake/core/errors"
        coreModels "github.com/apache/incubator-devlake/core/models"
        "github.com/apache/incubator-devlake/core/models/domainlayer"
@@ -36,7 +34,6 @@ func MakeDataSourcePipelinePlanV200(
        subtaskMetas []plugin.SubTaskMeta,
        connectionId uint64,
        bpScopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
        // get the connection info for url
        connection := &models.ZentaoConnection{}
@@ -46,7 +43,7 @@ func MakeDataSourcePipelinePlanV200(
        }
 
        plan := make(coreModels.PipelinePlan, len(bpScopes))
-       plan, scopes, err := makePipelinePlanV200(subtaskMetas, plan, bpScopes, 
connection, syncPolicy)
+       plan, scopes, err := makePipelinePlanV200(subtaskMetas, plan, bpScopes, 
connection)
        if err != nil {
                return nil, nil, err
        }
@@ -59,7 +56,6 @@ func makePipelinePlanV200(
        plan coreModels.PipelinePlan,
        bpScopes []*coreModels.BlueprintScope,
        connection *models.ZentaoConnection,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
        domainScopes := make([]plugin.Scope, 0)
        for i, bpScope := range bpScopes {
@@ -111,9 +107,6 @@ func makePipelinePlanV200(
                        }
                }*/
 
-               if syncPolicy.TimeAfter != nil {
-                       op.TimeAfter = syncPolicy.TimeAfter.Format(time.RFC3339)
-               }
                options, err := tasks.EncodeTaskOptions(op)
                if err != nil {
                        return nil, nil, err
diff --git a/backend/plugins/zentao/impl/impl.go 
b/backend/plugins/zentao/impl/impl.go
index cbea91b34..9f2f760cf 100644
--- a/backend/plugins/zentao/impl/impl.go
+++ b/backend/plugins/zentao/impl/impl.go
@@ -283,9 +283,8 @@ func (p Zentao) ApiResources() 
map[string]map[string]plugin.ApiResourceHandler {
 func (p Zentao) MakeDataSourcePipelinePlanV200(
        connectionId uint64,
        scopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (pp coreModels.PipelinePlan, sc []plugin.Scope, err errors.Error) {
-       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes, syncPolicy)
+       return api.MakeDataSourcePipelinePlanV200(p.SubTaskMetas(), 
connectionId, scopes)
 }
 
 func (p Zentao) Close(taskCtx plugin.TaskContext) errors.Error {
diff --git a/backend/plugins/zentao/tasks/bug_commits_collector.go 
b/backend/plugins/zentao/tasks/bug_commits_collector.go
index 63109bdfa..526ffc8a4 100644
--- a/backend/plugins/zentao/tasks/bug_commits_collector.go
+++ b/backend/plugins/zentao/tasks/bug_commits_collector.go
@@ -56,7 +56,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                Ctx:     taskCtx,
                Options: data.Options,
                Table:   RAW_BUG_COMMITS_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
diff --git a/backend/plugins/zentao/tasks/story_commits_collector.go 
b/backend/plugins/zentao/tasks/story_commits_collector.go
index 1c1552d66..117fdc7d9 100644
--- a/backend/plugins/zentao/tasks/story_commits_collector.go
+++ b/backend/plugins/zentao/tasks/story_commits_collector.go
@@ -51,7 +51,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                Ctx:     taskCtx,
                Options: data.Options,
                Table:   RAW_STORY_COMMITS_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
diff --git a/backend/plugins/zentao/tasks/task_commits_collector.go 
b/backend/plugins/zentao/tasks/task_commits_collector.go
index 0d6bacd84..4d4c54feb 100644
--- a/backend/plugins/zentao/tasks/task_commits_collector.go
+++ b/backend/plugins/zentao/tasks/task_commits_collector.go
@@ -50,7 +50,7 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext) 
errors.Error {
                Ctx:     taskCtx,
                Options: data.Options,
                Table:   RAW_TASK_COMMITS_TABLE,
-       }, data.TimeAfter)
+       })
        if err != nil {
                return err
        }
diff --git a/backend/plugins/zentao/tasks/task_data.go 
b/backend/plugins/zentao/tasks/task_data.go
index 9cf09292b..26bc9734c 100644
--- a/backend/plugins/zentao/tasks/task_data.go
+++ b/backend/plugins/zentao/tasks/task_data.go
@@ -19,7 +19,6 @@ package tasks
 
 import (
        "fmt"
-       "time"
 
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
@@ -37,7 +36,6 @@ type ZentaoOptions struct {
        ConnectionId uint64 `json:"connectionId"`
        ProjectId    int64  `json:"projectId" mapstructure:"projectId"`
        // TODO not support now
-       TimeAfter     string                    `json:"timeAfter" 
mapstructure:"timeAfter,omitempty"`
        ScopeConfigId uint64                    `json:"scopeConfigId" 
mapstructure:"scopeConfigId,omitempty"`
        ScopeConfig   *models.ZentaoScopeConfig `json:"scopeConfig" 
mapstructure:"scopeConfig,omitempty"`
 }
@@ -53,7 +51,6 @@ type ZentaoTaskData struct {
        Options  *ZentaoOptions
        RemoteDb dal.Dal
 
-       TimeAfter    *time.Time
        ProjectName  string
        Stories      map[int64]struct{}
        Tasks        map[int64]struct{}
diff --git a/backend/server/api/blueprints/blueprints.go 
b/backend/server/api/blueprints/blueprints.go
index 389247abb..b4f7410de 100644
--- a/backend/server/api/blueprints/blueprints.go
+++ b/backend/server/api/blueprints/blueprints.go
@@ -193,7 +193,6 @@ func Trigger(c *gin.Context) {
                        return
                }
        }
-
        pipeline, err := services.TriggerBlueprint(id, syncPolicy)
        if err != nil {
                shared.ApiOutputError(c, errors.Default.Wrap(err, "error 
triggering blueprint"))
diff --git a/backend/server/services/blueprint.go 
b/backend/server/services/blueprint.go
index 29246b43f..3d7e67c2e 100644
--- a/backend/server/services/blueprint.go
+++ b/backend/server/services/blueprint.go
@@ -53,12 +53,7 @@ type BlueprintJob struct {
 
 func (bj BlueprintJob) Run() {
        blueprint := bj.Blueprint
-       syncPolicy := &models.SyncPolicy{
-               TimeAfter:  blueprint.TimeAfter,
-               FullSync:   blueprint.FullSync,
-               SkipOnFail: blueprint.SkipOnFail,
-       }
-       pipeline, err := createPipelineByBlueprint(blueprint, syncPolicy)
+       pipeline, err := createPipelineByBlueprint(blueprint, 
&blueprint.SyncPolicy)
        if err == ErrEmptyPlan {
                blueprintLog.Info("Empty plan, blueprint id:[%d] blueprint 
name:[%s]", blueprint.ID, blueprint.Name)
                return
@@ -272,11 +267,9 @@ func ReloadBlueprints(c *cron.Cron) errors.Error {
 }
 
 func createPipelineByBlueprint(blueprint *models.Blueprint, syncPolicy 
*models.SyncPolicy) (*models.Pipeline, errors.Error) {
+
        var plan models.PipelinePlan
        var err errors.Error
-       if syncPolicy != nil && syncPolicy.FullSync {
-               blueprint.FullSync = true
-       }
        if blueprint.Mode == models.BLUEPRINT_MODE_NORMAL {
                plan, err = MakePlanForBlueprint(blueprint, syncPolicy)
                if err != nil {
@@ -292,12 +285,9 @@ func createPipelineByBlueprint(blueprint 
*models.Blueprint, syncPolicy *models.S
        newPipeline.BlueprintId = blueprint.ID
        newPipeline.Labels = blueprint.Labels
        newPipeline.SkipOnFail = blueprint.SkipOnFail
-
-       if syncPolicy != nil && syncPolicy.FullSync {
-               newPipeline.FullSync = true
-       } else {
-               newPipeline.FullSync = blueprint.FullSync
-       }
+       newPipeline.TimeAfter = blueprint.TimeAfter
+       newPipeline.FullSync = blueprint.FullSync
+       newPipeline.SkipCollectors = blueprint.SkipCollectors
 
        // if the plan is empty, we should not create the pipeline
        var shouldCreatePipeline bool
@@ -326,11 +316,6 @@ func createPipelineByBlueprint(blueprint 
*models.Blueprint, syncPolicy *models.S
 
 // MakePlanForBlueprint generates pipeline plan by version
 func MakePlanForBlueprint(blueprint *models.Blueprint, syncPolicy 
*models.SyncPolicy) (models.PipelinePlan, errors.Error) {
-       if syncPolicy != nil {
-               syncPolicy.TimeAfter = blueprint.TimeAfter
-               syncPolicy.FullSync = blueprint.FullSync
-       }
-
        var plan models.PipelinePlan
        // load project metric plugins and convert it to a map
        metrics := make(map[string]json.RawMessage)
@@ -344,7 +329,11 @@ func MakePlanForBlueprint(blueprint *models.Blueprint, 
syncPolicy *models.SyncPo
                        metrics[projectMetric.PluginName] = 
json.RawMessage(projectMetric.PluginOption)
                }
        }
-       plan, err := GeneratePlanJsonV200(blueprint.ProjectName, syncPolicy, 
blueprint.Connections, metrics)
+       skipCollectors := false
+       if syncPolicy != nil && syncPolicy.SkipCollectors {
+               skipCollectors = true
+       }
+       plan, err := GeneratePlanJsonV200(blueprint.ProjectName, 
blueprint.Connections, metrics, skipCollectors)
        if err != nil {
                return nil, err
        }
@@ -388,5 +377,8 @@ func TriggerBlueprint(id uint64, syncPolicy 
*models.SyncPolicy) (*models.Pipelin
                logger.Error(err, "GetBlueprint, id: %d", id)
                return nil, err
        }
+       blueprint.SkipCollectors = syncPolicy.SkipCollectors
+       blueprint.FullSync = syncPolicy.FullSync
+
        return createPipelineByBlueprint(blueprint, syncPolicy)
 }
diff --git a/backend/server/services/blueprint_makeplan_v200.go 
b/backend/server/services/blueprint_makeplan_v200.go
index b4f1bfff7..69e11bfc2 100644
--- a/backend/server/services/blueprint_makeplan_v200.go
+++ b/backend/server/services/blueprint_makeplan_v200.go
@@ -30,9 +30,9 @@ import (
 // GeneratePlanJsonV200 generates pipeline plan according v2.0.0 definition
 func GeneratePlanJsonV200(
        projectName string,
-       syncPolicy *coreModels.SyncPolicy,
        connections []*coreModels.BlueprintConnection,
        metrics map[string]json.RawMessage,
+       skipCollectors bool,
 ) (coreModels.PipelinePlan, errors.Error) {
        var err errors.Error
        // make plan for data-source coreModels fist. generate plan for each
@@ -56,7 +56,6 @@ func GeneratePlanJsonV200(
                        sourcePlans[i], pluginScopes, err = 
pluginBp.MakeDataSourcePipelinePlanV200(
                                connection.ConnectionId,
                                connection.Scopes,
-                               syncPolicy,
                        )
                        if err != nil {
                                return nil, err
@@ -72,7 +71,7 @@ func GeneratePlanJsonV200(
        }
 
        // skip collectors
-       if syncPolicy != nil && syncPolicy.SkipCollectors {
+       if skipCollectors {
                for i, plan := range sourcePlans {
                        for j, stage := range plan {
                                for k, task := range stage {
diff --git a/backend/server/services/blueprint_makeplan_v200_test.go 
b/backend/server/services/blueprint_makeplan_v200_test.go
index 093b9ea69..646996d4d 100644
--- a/backend/server/services/blueprint_makeplan_v200_test.go
+++ b/backend/server/services/blueprint_makeplan_v200_test.go
@@ -97,7 +97,7 @@ func TestMakePlanV200(t *testing.T) {
                doraName: nil,
        }
 
-       plan, err := GeneratePlanJsonV200(projectName, syncPolicy, connections, 
metrics)
+       plan, err := GeneratePlanJsonV200(projectName, connections, metrics, 
false)
        assert.Nil(t, err)
 
        assert.Equal(t, expectedPlan, plan)
diff --git a/backend/server/services/pipeline_helper.go 
b/backend/server/services/pipeline_helper.go
index a681608e0..f7077b5f6 100644
--- a/backend/server/services/pipeline_helper.go
+++ b/backend/server/services/pipeline_helper.go
@@ -61,8 +61,10 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) 
(pipeline *models.Pipelin
                Message:       "",
                SpentSeconds:  0,
                Plan:          newPipeline.Plan,
-               SkipOnFail:    newPipeline.SkipOnFail,
-               FullSync:      newPipeline.FullSync,
+               SyncPolicy: models.SyncPolicy{
+                       SkipOnFail: newPipeline.SkipOnFail,
+                       FullSync:   newPipeline.FullSync,
+               },
        }
        if newPipeline.BlueprintId != 0 {
                dbPipeline.BlueprintId = newPipeline.BlueprintId
diff --git a/backend/server/services/remote/plugin/plugin_extensions.go 
b/backend/server/services/remote/plugin/plugin_extensions.go
index 09e070383..5f1f4612c 100644
--- a/backend/server/services/remote/plugin/plugin_extensions.go
+++ b/backend/server/services/remote/plugin/plugin_extensions.go
@@ -43,7 +43,6 @@ func (p remoteMetricPlugin) 
MakeMetricPluginPipelinePlanV200(projectName string,
 func (p remoteDatasourcePlugin) MakeDataSourcePipelinePlanV200(
        connectionId uint64,
        bpScopes []*coreModels.BlueprintScope,
-       syncPolicy *coreModels.SyncPolicy,
 ) (coreModels.PipelinePlan, []plugin.Scope, errors.Error) {
        connection := p.connectionTabler.New()
        err := p.connHelper.FirstById(connection, connectionId)
diff --git a/backend/test/helper/api.go b/backend/test/helper/api.go
index 5caf817d0..9099f90ad 100644
--- a/backend/test/helper/api.go
+++ b/backend/test/helper/api.go
@@ -86,8 +86,10 @@ func (d *DevlakeClient) CreateBasicBlueprintV2(name string, 
config *BlueprintV2C
                Enable:      true,
                CronConfig:  "manual",
                IsManual:    true,
-               SkipOnFail:  config.SkipOnFail,
-               Labels:      []string{"test-label"},
+               SyncPolicy: models.SyncPolicy{
+                       SkipOnFail: config.SkipOnFail,
+               },
+               Labels: []string{"test-label"},
                Connections: []*models.BlueprintConnection{
                        config.Connection,
                },


Reply via email to