This is an automated email from the ASF dual-hosted git repository.
lynwee pushed a commit to branch release-v1.0
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/release-v1.0 by this push:
new 382c58d05 refactor(helpers): update "NewSubtaskStateManager" (#7861)
(#7866)
382c58d05 is described below
commit 382c58d0541e13825e26a46054a5143e7bd219e7
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Aug 8 10:50:18 2024 +0800
refactor(helpers): update "NewSubtaskStateManager" (#7861) (#7866)
* refactor(helpers): update "NewSubtaskStateManager"
* fix(helper): fix unit test
* refactor(helpers): rename calculateStateManagerIncrementalMode
Co-authored-by: Lynwee <[email protected]>
---
.../pluginhelper/api/subtask_state_manager.go | 105 ++++++++++++++-------
1 file changed, 72 insertions(+), 33 deletions(-)
diff --git a/backend/helpers/pluginhelper/api/subtask_state_manager.go
b/backend/helpers/pluginhelper/api/subtask_state_manager.go
index 86453c124..407a291bf 100644
--- a/backend/helpers/pluginhelper/api/subtask_state_manager.go
+++ b/backend/helpers/pluginhelper/api/subtask_state_manager.go
@@ -25,7 +25,7 @@ import (
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
- plugin "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/core/utils"
)
@@ -75,57 +75,96 @@ type SubtaskStateManager struct {
// NewSubtaskStateManager create a new SubtaskStateManager
func NewSubtaskStateManager(args *SubtaskCommonArgs) (stateManager
*SubtaskStateManager, err errors.Error) {
db := args.GetDal()
- syncPolicy := args.SubTaskContext.TaskContext().SyncPolicy()
- plugin := args.SubTaskContext.TaskContext().GetName()
- subtask := args.SubTaskContext.GetName()
// load sync policy and make sure it is not nil
+ syncPolicy := args.SubTaskContext.TaskContext().SyncPolicy()
if syncPolicy == nil {
syncPolicy = &models.SyncPolicy{}
}
+
+ plugin := args.SubTaskContext.TaskContext().GetName()
+ subtask := args.SubTaskContext.GetName()
params := args.GetRawDataParams()
- // load the previous state from the database
- state := &models.SubtaskState{}
- err = db.First(state, dal.Where(`plugin = ? AND subtask =? AND params =
?`, plugin, subtask, params))
+ preState, err := loadPreviousState(db, plugin, subtask, params)
if err != nil {
- if db.IsErrorNotFound(err) {
- state = &models.SubtaskState{
- Plugin: plugin,
- Subtask: subtask,
- Params: params,
- }
- err = nil
- } else {
- err = errors.Default.Wrap(err, "failed to load the
previous subtask state")
- return
- }
+ return
}
- // fullsync by default
+
+ isIncremental, since :=
calculateStateManagerIncrementalMode(syncPolicy, preState,
utils.ToJsonString(args.SubtaskConfig))
+
now := time.Now()
stateManager = &SubtaskStateManager{
db: db,
- state: state,
+ state: preState,
syncPolicy: syncPolicy,
- isIncremental: false,
- since: syncPolicy.TimeAfter,
+ isIncremental: isIncremental,
+ since: since,
until: &now,
config: utils.ToJsonString(args.SubtaskConfig),
}
// fallback to the previous timeAfter if no new value
if stateManager.since == nil {
- stateManager.since = state.TimeAfter
+ stateManager.since = preState.TimeAfter
}
- // if fullsync is set or no previous success start time, we are in the
full sync mode
- if syncPolicy.FullSync || state.PrevStartedAt == nil {
- return
+ return
+}
+
+func loadPreviousState(db dal.Dal, plugin, subtask, params string)
(*models.SubtaskState, errors.Error) {
+ // load the previous state from the database
+ preState := &models.SubtaskState{}
+ err := db.First(preState, dal.Where(`plugin = ? AND subtask =? AND
params = ?`, plugin, subtask, params))
+ if err != nil {
+ if db.IsErrorNotFound(err) {
+ preState = &models.SubtaskState{
+ Plugin: plugin,
+ Subtask: subtask,
+ Params: params,
+ }
+ } else {
+ return nil, errors.Default.Wrap(err, "failed to load
the previous subtask state")
+ }
+ }
+
+ return preState, nil
+}
+
+// calculateStateManagerIncrementalMode tries to calculate whether state
manager should run in incremental mode and returns the state manager's 'since'
time.
+func calculateStateManagerIncrementalMode(syncPolicy *models.SyncPolicy,
preState *models.SubtaskState, newSubtaskConfig string) (bool, *time.Time) {
+ if preState == nil || syncPolicy == nil {
+ panic("preState or syncPolicy is nil")
+ }
+
+ // User click 'Collect Data in Full Refresh Mode'
+ // No matter whether there is a successful pipeline.
+ if syncPolicy.FullSync {
+ return false, syncPolicy.TimeAfter
}
- // if timeAfter is not set or NOT before the previous value, we are in
the incremental mode
- if (syncPolicy.TimeAfter == nil || state.TimeAfter == nil ||
!syncPolicy.TimeAfter.Before(*state.TimeAfter)) &&
- // and the previous config is the same as the current config
- (state.PrevConfig == "" || state.PrevConfig ==
stateManager.config) {
- stateManager.isIncremental = true
- stateManager.since = state.PrevStartedAt
+ // No previous success state means this pipeline has never been
executed.
+ if preState.PrevStartedAt == nil {
+ return false, syncPolicy.TimeAfter
}
- return
+ // When subtask config has changed, state manager should NOT in
incremental mode.
+ if subTaskConfigHasChanged(preState, newSubtaskConfig) {
+ return false, syncPolicy.TimeAfter
+ }
+ // There is a sync policy and sync policy is earlier than latest
successful pipeline's timeAfter
+ if syncPolicy.TimeAfter != nil && preState.TimeAfter != nil &&
syncPolicy.TimeAfter.Before(*preState.TimeAfter) {
+ return false, syncPolicy.TimeAfter
+ }
+
+ // No need to do a full refresh, run task incrementally.
+ // New state manager's start time is previous state's finished time.
+ // But there is no such field, so use previous state's PrevStartedAt
time.
+ return true, preState.PrevStartedAt
+}
+
+// subTaskConfigHasChanged checks whether the previous sub-task config is the
same as the current sub-task config
+// When plugin's scope config changes, Subtask's config may change.
+func subTaskConfigHasChanged(preState *models.SubtaskState, newSubtaskConfig
string) bool {
+ if preState == nil {
+ return true
+ }
+ preConfig := preState.PrevConfig
+ return preConfig != "" && preConfig != newSubtaskConfig
}
func (c *SubtaskStateManager) IsIncremental() bool {