klesh commented on code in PR #6094:
URL:
https://github.com/apache/incubator-devlake/pull/6094#discussion_r1328248839
##########
backend/helpers/pluginhelper/api/api_collector_with_state.go:
##########
@@ -49,54 +50,77 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs)
(*ApiCollectorStateManager
if err != nil {
return nil, errors.Default.Wrap(err, "Couldn't resolve raw
subtask args")
}
- latestState := models.CollectorLatestState{}
- err = db.First(&latestState, dal.Where(`raw_data_table = ? AND
raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
+
+ // CollectorLatestState retrieves the latest collector state from the
database
+ oldState := models.CollectorLatestState{}
+ err = db.First(&oldState, dal.Where(`raw_data_table = ? AND
raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
if err != nil {
if db.IsErrorNotFound(err) {
- latestState = models.CollectorLatestState{
+ oldState = models.CollectorLatestState{
RawDataTable: rawDataSubTask.table,
RawDataParams: rawDataSubTask.params,
}
} else {
return nil, errors.Default.Wrap(err, "failed to load
JiraLatestCollectorMeta")
}
}
- var timeAfter *time.Time
+ // Extract timeAfter and latestSuccessStart from old state
+ oldTimeAfter := oldState.TimeAfter
+ oldLatestSuccessStart := oldState.LatestSuccessStart
+
+ // Get syncPolicy timeAfter and fullSync from context
+ var newTimeAfter *time.Time
+ var fullSync bool
syncPolicy := args.Ctx.TaskContext().SyncPolicy()
if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- timeAfter = syncPolicy.TimeAfter
+ newTimeAfter = syncPolicy.TimeAfter
+ fullSync = syncPolicy.FullSync
}
+
+ // Calculate incremental and since based on syncPolicy and old state
+ var isIncreamtal bool
+ var since *time.Time
+
+ if oldLatestSuccessStart == nil {
+ // 1. If no oldState.LatestSuccessStart, not incremental and
since is syncPolicy.TimeAfter
+ isIncreamtal = false
+ since = newTimeAfter
+ } else if syncPolicy == nil {
+ // 2. If no syncPolicy, incremental and since is
oldState.LatestSuccessStart
+ isIncreamtal = true
+ since = oldLatestSuccessStart
+ } else if fullSync {
+ // 3. If fullSync true, not incremental and since is
syncPolicy.TimeAfter
+ isIncreamtal = false
+ since = newTimeAfter
+ } else if newTimeAfter != nil {
+ // 4. If syncPolicy.TimeAfter is not nil, compare with old
oldState.TimeAfter
+ if oldTimeAfter == nil || !newTimeAfter.Before(*oldTimeAfter) {
+ isIncreamtal = false
+ since = newTimeAfter
+ } else {
+ isIncreamtal = true
+ since = oldLatestSuccessStart
+ }
+ }
+
+ currentTime := time.Now()
+ oldState.LatestSuccessStart = ¤tTime
+ oldState.TimeAfter = newTimeAfter
+
return &ApiCollectorStateManager{
RawDataSubTaskArgs: args,
- LatestState: latestState,
- TimeAfter: timeAfter,
- ExecuteStart: time.Now(),
+ NewState: oldState,
Review Comment:
This should be `private`
##########
backend/helpers/pluginhelper/api/api_collector_with_state.go:
##########
@@ -49,54 +50,77 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs)
(*ApiCollectorStateManager
if err != nil {
return nil, errors.Default.Wrap(err, "Couldn't resolve raw
subtask args")
}
- latestState := models.CollectorLatestState{}
- err = db.First(&latestState, dal.Where(`raw_data_table = ? AND
raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
+
+ // CollectorLatestState retrieves the latest collector state from the
database
+ oldState := models.CollectorLatestState{}
+ err = db.First(&oldState, dal.Where(`raw_data_table = ? AND
raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
if err != nil {
if db.IsErrorNotFound(err) {
- latestState = models.CollectorLatestState{
+ oldState = models.CollectorLatestState{
RawDataTable: rawDataSubTask.table,
RawDataParams: rawDataSubTask.params,
}
} else {
return nil, errors.Default.Wrap(err, "failed to load
JiraLatestCollectorMeta")
}
}
- var timeAfter *time.Time
+ // Extract timeAfter and latestSuccessStart from old state
+ oldTimeAfter := oldState.TimeAfter
+ oldLatestSuccessStart := oldState.LatestSuccessStart
+
+ // Get syncPolicy timeAfter and fullSync from context
+ var newTimeAfter *time.Time
+ var fullSync bool
syncPolicy := args.Ctx.TaskContext().SyncPolicy()
if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- timeAfter = syncPolicy.TimeAfter
+ newTimeAfter = syncPolicy.TimeAfter
+ fullSync = syncPolicy.FullSync
}
+
+ // Calculate incremental and since based on syncPolicy and old state
+ var isIncreamtal bool
+ var since *time.Time
+
+ if oldLatestSuccessStart == nil {
+ // 1. If no oldState.LatestSuccessStart, not incremental and
since is syncPolicy.TimeAfter
+ isIncreamtal = false
+ since = newTimeAfter
+ } else if syncPolicy == nil {
+ // 2. If no syncPolicy, incremental and since is
oldState.LatestSuccessStart
+ isIncreamtal = true
+ since = oldLatestSuccessStart
+ } else if fullSync {
+ // 3. If fullSync true, not incremental and since is
syncPolicy.TimeAfter
+ isIncreamtal = false
+ since = newTimeAfter
+ } else if newTimeAfter != nil {
+ // 4. If syncPolicy.TimeAfter is not nil, compare with old
oldState.TimeAfter
+ if oldTimeAfter == nil || !newTimeAfter.Before(*oldTimeAfter) {
Review Comment:
What if the `oldTimeAfter` is `nil` because of user didn't specify any on
the previous execution? In this case, all available data has been collected
once, so the `isIncremental` should be `true` instead of `false`
##########
backend/helpers/pluginhelper/api/api_collector_with_state.go:
##########
@@ -49,54 +50,77 @@ func NewStatefulApiCollector(args RawDataSubTaskArgs)
(*ApiCollectorStateManager
if err != nil {
return nil, errors.Default.Wrap(err, "Couldn't resolve raw
subtask args")
}
- latestState := models.CollectorLatestState{}
- err = db.First(&latestState, dal.Where(`raw_data_table = ? AND
raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
+
+ // CollectorLatestState retrieves the latest collector state from the
database
+ oldState := models.CollectorLatestState{}
+ err = db.First(&oldState, dal.Where(`raw_data_table = ? AND
raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
if err != nil {
if db.IsErrorNotFound(err) {
- latestState = models.CollectorLatestState{
+ oldState = models.CollectorLatestState{
RawDataTable: rawDataSubTask.table,
RawDataParams: rawDataSubTask.params,
}
} else {
return nil, errors.Default.Wrap(err, "failed to load
JiraLatestCollectorMeta")
}
}
- var timeAfter *time.Time
+ // Extract timeAfter and latestSuccessStart from old state
+ oldTimeAfter := oldState.TimeAfter
+ oldLatestSuccessStart := oldState.LatestSuccessStart
+
+ // Get syncPolicy timeAfter and fullSync from context
+ var newTimeAfter *time.Time
+ var fullSync bool
syncPolicy := args.Ctx.TaskContext().SyncPolicy()
if syncPolicy != nil && syncPolicy.TimeAfter != nil {
- timeAfter = syncPolicy.TimeAfter
+ newTimeAfter = syncPolicy.TimeAfter
+ fullSync = syncPolicy.FullSync
}
+
+ // Calculate incremental and since based on syncPolicy and old state
+ var isIncreamtal bool
+ var since *time.Time
+
+ if oldLatestSuccessStart == nil {
+ // 1. If no oldState.LatestSuccessStart, not incremental and
since is syncPolicy.TimeAfter
+ isIncreamtal = false
+ since = newTimeAfter
+ } else if syncPolicy == nil {
+ // 2. If no syncPolicy, incremental and since is
oldState.LatestSuccessStart
+ isIncreamtal = true
+ since = oldLatestSuccessStart
+ } else if fullSync {
Review Comment:
I believe this is the **Highest** priority.
Consider the case when `oldLatestSuccessStart != nil` and `syncPolicy ==
nil`, this `else` would not be executed at all.
##########
backend/plugins/bitbucket/tasks/api_common.go:
##########
@@ -101,14 +101,7 @@ func GetQueryCreatedAndUpdated(fields string,
collectorWithState *api.ApiCollect
}
query.Set("fields", fields)
query.Set("sort", "created_on")
- if collectorWithState.IsIncremental() {
- latestSuccessStart :=
collectorWithState.LatestState.LatestSuccessStart.Format(time.RFC3339)
- query.Set("q", fmt.Sprintf("updated_on>=%s",
latestSuccessStart))
- } else if collectorWithState.TimeAfter != nil {
- timeAfter :=
collectorWithState.TimeAfter.Format(time.RFC3339)
- query.Set("q", fmt.Sprintf("updated_on>=%s", timeAfter))
- }
-
+ query.Set("q", fmt.Sprintf("updated_on>=%s",
collectorWithState.Since.Format(time.RFC3339)))
Review Comment:
It is possible that the `Since` is `nil`, isn't it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]