ZihanLi58 commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1117801844
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -210,26 +245,67 @@ private void scheduleSpecsFromCatalog() {
clearRunningFlowState(drUris);
}
} catch (IOException e) {
- throw new RuntimeException("Failed to get the iterator of all Spec
URIS", e);
+ throw new RuntimeException("Failed to get Spec URIs with tag to clear
running flow state", e);
}
- while (specUris.hasNext()) {
- Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
- try {
- // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership
change if the property is set to true
- if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean((
- (FlowSpec) spec).getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
- Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec)
spec);
- onAddSpec(modifiedSpec);
- } else {
- onAddSpec(spec);
+ int startOffset = 0;
+ long batchGetStartTime;
+ long batchGetEndTime;
+
+ while (startOffset < numSpecs) {
+ batchGetStartTime = System.currentTimeMillis();
+ Collection<Spec> batchOfSpecs =
this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
+ Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
+ batchGetEndTime = System.currentTimeMillis();
+
+ while (batchOfSpecsIterator.hasNext()) {
+ Spec spec = batchOfSpecsIterator.next();
+ try {
+ // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership
change if the property is set to true
+ if (spec instanceof FlowSpec && PropertiesUtils
+ .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
+ "false")) {
+ Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec)
spec);
+ onAddSpec(modifiedSpec);
+ } else {
+ onAddSpec(spec);
+ }
+ urisLeftToSchedule.remove(spec.getUri());
+ } catch (Exception e) {
+ // If there is an uncaught error thrown during compilation, log it
and continue adding flows
+ _log.error("Could not schedule spec {} from flowCatalog due to ",
spec, e);
}
- } catch (Exception e) {
- // If there is an uncaught error thrown during compilation, log it and
continue adding flows
- _log.error("Could not schedule spec {} from flowCatalog due to ",
spec, e);
}
+ startOffset += this.loadSpecsBatchSize;
+ // This count is used to ensure the average spec get time is calculated
accurately for the last batch which may be
+ // smaller than the loadSpecsBatchSize
+ averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) /
batchOfSpecs.size();
}
+
+ // Ensure we did not miss any specs due to ordering changing
(deletions/insertions) while loading
+ Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
+ while (urisLeft.hasNext()) {
+ URI uri = urisLeft.next();
+ try {
+ Spec spec = this.flowCatalog.get().getSpecs(uri);
+ // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership
change if the property is set to true
+ if (spec instanceof FlowSpec && PropertiesUtils
Review Comment:
Duplicate code, can we extract them as one method?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -210,26 +245,67 @@ private void scheduleSpecsFromCatalog() {
clearRunningFlowState(drUris);
}
} catch (IOException e) {
- throw new RuntimeException("Failed to get the iterator of all Spec
URIS", e);
+ throw new RuntimeException("Failed to get Spec URIs with tag to clear
running flow state", e);
}
- while (specUris.hasNext()) {
- Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
- try {
- // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership
change if the property is set to true
- if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean((
- (FlowSpec) spec).getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
- Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec)
spec);
- onAddSpec(modifiedSpec);
- } else {
- onAddSpec(spec);
+ int startOffset = 0;
+ long batchGetStartTime;
+ long batchGetEndTime;
+
+ while (startOffset < numSpecs) {
+ batchGetStartTime = System.currentTimeMillis();
+ Collection<Spec> batchOfSpecs =
this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
+ Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
+ batchGetEndTime = System.currentTimeMillis();
+
+ while (batchOfSpecsIterator.hasNext()) {
+ Spec spec = batchOfSpecsIterator.next();
+ try {
+ // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership
change if the property is set to true
+ if (spec instanceof FlowSpec && PropertiesUtils
+ .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
+ "false")) {
+ Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec)
spec);
+ onAddSpec(modifiedSpec);
+ } else {
+ onAddSpec(spec);
+ }
+ urisLeftToSchedule.remove(spec.getUri());
+ } catch (Exception e) {
+ // If there is an uncaught error thrown during compilation, log it
and continue adding flows
+ _log.error("Could not schedule spec {} from flowCatalog due to ",
spec, e);
}
- } catch (Exception e) {
- // If there is an uncaught error thrown during compilation, log it and
continue adding flows
- _log.error("Could not schedule spec {} from flowCatalog due to ",
spec, e);
}
+ startOffset += this.loadSpecsBatchSize;
+ // This count is used to ensure the average spec get time is calculated
accurately for the last batch which may be
+ // smaller than the loadSpecsBatchSize
+ averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) /
batchOfSpecs.size();
}
+
+ // Ensure we did not miss any specs due to ordering changing
(deletions/insertions) while loading
+ Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
+ while (urisLeft.hasNext()) {
+ URI uri = urisLeft.next();
+ try {
+ Spec spec = this.flowCatalog.get().getSpecs(uri);
Review Comment:
Use getSpecWrapper? It's possible that it's adhoc flow and we delete it
after execute.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +418,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}
}
+ // Compare the modification timestamp of the spec being added if the
scheduler is being initialized, ideally we
+ // don't even want to do the same update twice as it will kill the
existing flow and reschedule it unnecessarily
+ Long modificationTime =
Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.modificationTimeKey,
"0"));
+ String uriString = flowSpec.getUri().toString();
+ // If the modification time is 0 (which means the original API was used to
retrieve spec or warm standby mode is not
+ // enabled), spec not in scheduler, or have a modification time associated
with it assume it's the most recent
+ if (modificationTime != 0L &&
this.scheduledFlowSpecs.containsKey(uriString)
+ && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+ if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {
Review Comment:
might be a super corner case, but if the modification time is the same, you
might want to check whether the flow is run-immediately, as the load method
overwrite that value and might skip the first run of the flow.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]