umustafi commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1117840401
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +418,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}
}
+ // Compare the modification timestamp of the spec being added if the
scheduler is being initialized, ideally we
+ // don't even want to do the same update twice as it will kill the
existing flow and reschedule it unnecessarily
+ Long modificationTime =
Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.modificationTimeKey,
"0"));
+ String uriString = flowSpec.getUri().toString();
+ // If the modification time is 0 (which means the original API was used to
retrieve spec or warm standby mode is not
+ // enabled), spec not in scheduler, or have a modification time associated
with it assume it's the most recent
+ if (modificationTime != 0L &&
this.scheduledFlowSpecs.containsKey(uriString)
+ && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+ if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {
+ _log.info("Ignoring the spec {} modified at time {} because we have a
more updated version from time {}",
+ addedSpec,
modificationTime,this.lastUpdatedTimeForFlowSpec.get(uriString));
+ return new AddSpecResponse(response);
+ }
+ }
+
// todo : we should probably not schedule a flow if it is a runOnce flow
this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec);
+ this.lastUpdatedTimeForFlowSpec.put(flowSpecUri.toString(),
modificationTime);
Review Comment:
We don't have to but there's no harm in having the check regardless.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]