This is an automated email from the ASF dual-hosted git repository.

lynwee pushed a commit to branch release-v1.0
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v1.0 by this push:
     new 382c58d05 refactor(helpers): update "NewSubtaskStateManager" (#7861) 
(#7866)
382c58d05 is described below

commit 382c58d0541e13825e26a46054a5143e7bd219e7
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Aug 8 10:50:18 2024 +0800

    refactor(helpers): update "NewSubtaskStateManager" (#7861) (#7866)
    
    * refactor(helpers): update "NewSubtaskStateManager"
    
    * fix(helper): fix unit test
    
    * refactor(helpers): rename calculateStateManagerIncrementalMode
    
    Co-authored-by: Lynwee <[email protected]>
---
 .../pluginhelper/api/subtask_state_manager.go      | 105 ++++++++++++++-------
 1 file changed, 72 insertions(+), 33 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/subtask_state_manager.go 
b/backend/helpers/pluginhelper/api/subtask_state_manager.go
index 86453c124..407a291bf 100644
--- a/backend/helpers/pluginhelper/api/subtask_state_manager.go
+++ b/backend/helpers/pluginhelper/api/subtask_state_manager.go
@@ -25,7 +25,7 @@ import (
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/models"
-       plugin "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/core/plugin"
        "github.com/apache/incubator-devlake/core/utils"
 )
 
@@ -75,57 +75,96 @@ type SubtaskStateManager struct {
 // NewSubtaskStateManager create a new SubtaskStateManager
 func NewSubtaskStateManager(args *SubtaskCommonArgs) (stateManager 
*SubtaskStateManager, err errors.Error) {
        db := args.GetDal()
-       syncPolicy := args.SubTaskContext.TaskContext().SyncPolicy()
-       plugin := args.SubTaskContext.TaskContext().GetName()
-       subtask := args.SubTaskContext.GetName()
        // load sync policy and make sure it is not nil
+       syncPolicy := args.SubTaskContext.TaskContext().SyncPolicy()
        if syncPolicy == nil {
                syncPolicy = &models.SyncPolicy{}
        }
+
+       plugin := args.SubTaskContext.TaskContext().GetName()
+       subtask := args.SubTaskContext.GetName()
        params := args.GetRawDataParams()
-       // load the previous state from the database
-       state := &models.SubtaskState{}
-       err = db.First(state, dal.Where(`plugin = ? AND subtask =? AND params = 
?`, plugin, subtask, params))
+       preState, err := loadPreviousState(db, plugin, subtask, params)
        if err != nil {
-               if db.IsErrorNotFound(err) {
-                       state = &models.SubtaskState{
-                               Plugin:  plugin,
-                               Subtask: subtask,
-                               Params:  params,
-                       }
-                       err = nil
-               } else {
-                       err = errors.Default.Wrap(err, "failed to load the 
previous subtask state")
-                       return
-               }
+               return
        }
-       // fullsync by default
+
+       isIncremental, since := 
calculateStateManagerIncrementalMode(syncPolicy, preState, 
utils.ToJsonString(args.SubtaskConfig))
+
        now := time.Now()
        stateManager = &SubtaskStateManager{
                db:            db,
-               state:         state,
+               state:         preState,
                syncPolicy:    syncPolicy,
-               isIncremental: false,
-               since:         syncPolicy.TimeAfter,
+               isIncremental: isIncremental,
+               since:         since,
                until:         &now,
                config:        utils.ToJsonString(args.SubtaskConfig),
        }
        // fallback to the previous timeAfter if no new value
        if stateManager.since == nil {
-               stateManager.since = state.TimeAfter
+               stateManager.since = preState.TimeAfter
        }
-       // if fullsync is set or no previous success start time, we are in the 
full sync mode
-       if syncPolicy.FullSync || state.PrevStartedAt == nil {
-               return
+       return
+}
+
+func loadPreviousState(db dal.Dal, plugin, subtask, params string) 
(*models.SubtaskState, errors.Error) {
+       // load the previous state from the database
+       preState := &models.SubtaskState{}
+       err := db.First(preState, dal.Where(`plugin = ? AND subtask =? AND 
params = ?`, plugin, subtask, params))
+       if err != nil {
+               if db.IsErrorNotFound(err) {
+                       preState = &models.SubtaskState{
+                               Plugin:  plugin,
+                               Subtask: subtask,
+                               Params:  params,
+                       }
+               } else {
+                       return nil, errors.Default.Wrap(err, "failed to load 
the previous subtask state")
+               }
+       }
+
+       return preState, nil
+}
+
+// calculateStateManagerIncrementalMode tries to calculate whether state 
manager should run in incremental mode and returns the state manager's 'since' 
time.
+func calculateStateManagerIncrementalMode(syncPolicy *models.SyncPolicy, 
preState *models.SubtaskState, newSubtaskConfig string) (bool, *time.Time) {
+       if preState == nil || syncPolicy == nil {
+               panic("preState or syncPolicy is nil")
+       }
+
+       // User click 'Collect Data in Full Refresh Mode'
+       // No matter whether there is a successful pipeline.
+       if syncPolicy.FullSync {
+               return false, syncPolicy.TimeAfter
        }
-       // if timeAfter is not set or NOT before the previous value, we are in 
the incremental mode
-       if (syncPolicy.TimeAfter == nil || state.TimeAfter == nil || 
!syncPolicy.TimeAfter.Before(*state.TimeAfter)) &&
-               // and the previous config is the same as the current config
-               (state.PrevConfig == "" || state.PrevConfig == 
stateManager.config) {
-               stateManager.isIncremental = true
-               stateManager.since = state.PrevStartedAt
+       // No previous success state means this pipeline has never been 
executed.
+       if preState.PrevStartedAt == nil {
+               return false, syncPolicy.TimeAfter
        }
-       return
+       // When subtask config has changed, state manager should NOT in 
incremental mode.
+       if subTaskConfigHasChanged(preState, newSubtaskConfig) {
+               return false, syncPolicy.TimeAfter
+       }
+       // There is a sync policy and sync policy is earlier than latest 
successful pipeline's timeAfter
+       if syncPolicy.TimeAfter != nil && preState.TimeAfter != nil && 
syncPolicy.TimeAfter.Before(*preState.TimeAfter) {
+               return false, syncPolicy.TimeAfter
+       }
+
+       // No need to do a full refresh, run task incrementally.
+       // New state manager's start time is previous state's finished time.
+       // But there is no such field, so use previous state's PrevStartedAt 
time.
+       return true, preState.PrevStartedAt
+}
+
+// subTaskConfigHasChanged checks whether the previous sub-task config is the 
same as the current sub-task config
+// When plugin's scope config changes, Subtask's config may change.
+func subTaskConfigHasChanged(preState *models.SubtaskState, newSubtaskConfig 
string) bool {
+       if preState == nil {
+               return true
+       }
+       preConfig := preState.PrevConfig
+       return preConfig != "" && preConfig != newSubtaskConfig
 }
 
 func (c *SubtaskStateManager) IsIncremental() bool {

Reply via email to