ZihanLi58 commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1119138633
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -343,8 +416,27 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}
}
+ // Compare the modification timestamp of the spec being added if the
scheduler is being initialized, ideally we
+ // don't even want to do the same update twice as it will kill the
existing flow and reschedule it unnecessarily
+ Long modificationTime =
Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.MODIFICATION_TIME_KEY,
"0"));
+ String uriString = flowSpec.getUri().toString();
+ Boolean isRunImmediately = PropertiesUtils.getPropAsBoolean(jobConfig,
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false");
+ // If the modification time is 0 (which means the original API was used to
retrieve spec or warm standby mode is not
+ // enabled), spec not in scheduler, or have a modification time associated
with it assume it's the most recent
+ if (modificationTime != 0L &&
this.scheduledFlowSpecs.containsKey(uriString)
+ && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+ // For run-immediately flows with a schedule the modified_time would
remain the same
+ if (this.lastUpdatedTimeForFlowSpec.get(uriString) > modificationTime
+ ||
(this.lastUpdatedTimeForFlowSpec.get(uriString).equals(modificationTime) &&
!isRunImmediately)) {
Review Comment:
why equals but not ==?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +418,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}
}
+ // Compare the modification timestamp of the spec being added if the
scheduler is being initialized, ideally we
+ // don't even want to do the same update twice as it will kill the
existing flow and reschedule it unnecessarily
+ Long modificationTime =
Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.modificationTimeKey,
"0"));
+ String uriString = flowSpec.getUri().toString();
+ // If the modification time is 0 (which means the original API was used to
retrieve spec or warm standby mode is not
+ // enabled), spec not in scheduler, or have a modification time associated
with it assume it's the most recent
+ if (modificationTime != 0L &&
this.scheduledFlowSpecs.containsKey(uriString)
+ && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+ if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {
+ _log.info("Ignoring the spec {} modified at time {} because we have a
more updated version from time {}",
+ addedSpec,
modificationTime,this.lastUpdatedTimeForFlowSpec.get(uriString));
+ return new AddSpecResponse(response);
+ }
+ }
+
// todo : we should probably not schedule a flow if it is a runOnce flow
this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec);
+ this.lastUpdatedTimeForFlowSpec.put(flowSpecUri.toString(),
modificationTime);
Review Comment:
I think you want to delete the entry when we remove the flow from the
scheduler, otherwise, you will have a slight memory leak when we keep receiving
ad-hoc flow requests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]