This is an automated email from the ASF dual-hosted git repository. lynwee pushed a commit to branch update-update-subtask-state-manager in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 50c31312cb83af0c2d6e7f90e08294a08bdd866f Author: d4x1 <[email protected]> AuthorDate: Wed Aug 7 17:28:13 2024 +0800 refactor(helpers): update "NewSubtaskStateManager" --- .../pluginhelper/api/subtask_state_manager.go | 103 ++++++++++++++------- 1 file changed, 70 insertions(+), 33 deletions(-) diff --git a/backend/helpers/pluginhelper/api/subtask_state_manager.go b/backend/helpers/pluginhelper/api/subtask_state_manager.go index 22f71453d..fb3246afe 100644 --- a/backend/helpers/pluginhelper/api/subtask_state_manager.go +++ b/backend/helpers/pluginhelper/api/subtask_state_manager.go @@ -25,7 +25,7 @@ import ( "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models" - plugin "github.com/apache/incubator-devlake/core/plugin" + "github.com/apache/incubator-devlake/core/plugin" "github.com/apache/incubator-devlake/core/utils" ) @@ -75,57 +75,94 @@ type SubtaskStateManager struct { // NewSubtaskStateManager create a new SubtaskStateManager func NewSubtaskStateManager(args *SubtaskCommonArgs) (stateManager *SubtaskStateManager, err errors.Error) { db := args.GetDal() - syncPolicy := args.SubTaskContext.TaskContext().SyncPolicy() - plugin := args.SubTaskContext.TaskContext().GetName() - subtask := args.SubTaskContext.GetName() // load sync policy and make sure it is not nil + syncPolicy := args.SubTaskContext.TaskContext().SyncPolicy() if syncPolicy == nil { syncPolicy = &models.SyncPolicy{} } + + plugin := args.SubTaskContext.TaskContext().GetName() + subtask := args.SubTaskContext.GetName() params := args.GetRawDataParams() - // load the previous state from the database - state := &models.SubtaskState{} - err = db.First(state, dal.Where(`plugin = ? AND subtask =? AND params = ?`, plugin, subtask, params)) + preState, err := loadPreviousState(db, plugin, subtask, params) if err != nil { - if db.IsErrorNotFound(err) { - state = &models.SubtaskState{ - Plugin: plugin, - Subtask: subtask, - Params: params, - } - err = nil - } else { - err = errors.Default.Wrap(err, "failed to load the previous subtask state") - return - } + return } - // fullsync by default + + isIncremental, since := calculateStateManagerMode(syncPolicy, preState, utils.ToJsonString(args.SubtaskConfig)) + now := time.Now() stateManager = &SubtaskStateManager{ db: db, - state: state, + state: preState, syncPolicy: syncPolicy, - isIncremental: false, - since: syncPolicy.TimeAfter, + isIncremental: isIncremental, + since: since, until: &now, config: utils.ToJsonString(args.SubtaskConfig), } // fallback to the previous timeAfter if no new value if stateManager.since == nil { - stateManager.since = state.TimeAfter + stateManager.since = preState.TimeAfter } - // if fullsync is set or no previous success start time, we are in the full sync mode - if syncPolicy.FullSync || state.PrevStartedAt == nil { - return + return +} + +func loadPreviousState(db dal.Dal, plugin, subtask, params string) (*models.SubtaskState, errors.Error) { + // load the previous state from the database + preState := &models.SubtaskState{} + err := db.First(preState, dal.Where(`plugin = ? AND subtask =? AND params = ?`, plugin, subtask, params)) + if err != nil { + if db.IsErrorNotFound(err) { + preState = &models.SubtaskState{ + Plugin: plugin, + Subtask: subtask, + Params: params, + } + } else { + return nil, errors.Default.Wrap(err, "failed to load the previous subtask state") + } } - // if timeAfter is not set or NOT before the previous vaule, we are in the incremental mode - if (syncPolicy.TimeAfter == nil || state.TimeAfter == nil || !syncPolicy.TimeAfter.Before(*state.TimeAfter)) && - // and the previous config is the same as the current config - (state.PrevConfig == "" || state.PrevConfig == stateManager.config) { - stateManager.isIncremental = true - stateManager.since = state.PrevStartedAt + return preState, nil +} + +func calculateStateManagerMode(syncPolicy *models.SyncPolicy, preState *models.SubtaskState, newSubtaskConfig string) (bool, *time.Time) { + if preState == nil || syncPolicy == nil { + panic("preState or syncPolicy is nil") } - return + + // User click 'Collect Data in Full Refresh Mode' + // No matter whether there is a successful pipeline. + if syncPolicy.FullSync { + return false, syncPolicy.TimeAfter + } + // No previous success state means this pipeline has never been executed. + if preState.PrevStartedAt == nil { + return false, syncPolicy.TimeAfter + } + // When subtask config has changed, state manager should NOT in incremental mode. + if subTaskConfigHasChanged(preState, newSubtaskConfig) { + return false, syncPolicy.TimeAfter + } + // There is a sync policy and sync policy is earlier than latest successful pipeline's timeAfter + if syncPolicy.TimeAfter != nil && preState.TimeAfter != nil || syncPolicy.TimeAfter.Before(*preState.TimeAfter) { + return false, syncPolicy.TimeAfter + } + + // No need to do a full refresh, run task incrementally. + // New state manager's start time is previous state's finished time. + // But there is no such field, so use previous state's PrevStartedAt time. + return true, preState.PrevStartedAt +} + +// subTaskConfigHasChanged checks whether the previous sub-task config is the same as the current sub-task config +// When plugin's scope config changes, Subtask's config may change. +func subTaskConfigHasChanged(preState *models.SubtaskState, newSubtaskConfig string) bool { + if preState == nil { + return true + } + preConfig := preState.PrevConfig + return preConfig != "" && preConfig != newSubtaskConfig } func (c *SubtaskStateManager) IsIncremental() bool {
