[
https://issues.apache.org/jira/browse/GOBBLIN-1783?focusedWorklogId=847593&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-847593
]
ASF GitHub Bot logged work on GOBBLIN-1783:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 24/Feb/23 23:31
Start Date: 24/Feb/23 23:31
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1117801844
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -210,26 +245,67 @@ private void scheduleSpecsFromCatalog() {
clearRunningFlowState(drUris);
}
} catch (IOException e) {
- throw new RuntimeException("Failed to get the iterator of all Spec
URIS", e);
+ throw new RuntimeException("Failed to get Spec URIs with tag to clear
running flow state", e);
}
- while (specUris.hasNext()) {
- Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
- try {
- // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership
change if the property is set to true
- if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean((
- (FlowSpec) spec).getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
- Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec)
spec);
- onAddSpec(modifiedSpec);
- } else {
- onAddSpec(spec);
+ int startOffset = 0;
+ long batchGetStartTime;
+ long batchGetEndTime;
+
+ while (startOffset < numSpecs) {
+ batchGetStartTime = System.currentTimeMillis();
+ Collection<Spec> batchOfSpecs =
this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
+ Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
+ batchGetEndTime = System.currentTimeMillis();
+
+ while (batchOfSpecsIterator.hasNext()) {
+ Spec spec = batchOfSpecsIterator.next();
+ try {
+ // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership
change if the property is set to true
+ if (spec instanceof FlowSpec && PropertiesUtils
+ .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
+ "false")) {
+ Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec)
spec);
+ onAddSpec(modifiedSpec);
+ } else {
+ onAddSpec(spec);
+ }
+ urisLeftToSchedule.remove(spec.getUri());
+ } catch (Exception e) {
+ // If there is an uncaught error thrown during compilation, log it
and continue adding flows
+ _log.error("Could not schedule spec {} from flowCatalog due to ",
spec, e);
}
- } catch (Exception e) {
- // If there is an uncaught error thrown during compilation, log it and
continue adding flows
- _log.error("Could not schedule spec {} from flowCatalog due to ",
spec, e);
}
+ startOffset += this.loadSpecsBatchSize;
+ // This count is used to ensure the average spec get time is calculated
accurately for the last batch which may be
+ // smaller than the loadSpecsBatchSize
+ averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) /
batchOfSpecs.size();
}
+
+ // Ensure we did not miss any specs due to ordering changing
(deletions/insertions) while loading
+ Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
+ while (urisLeft.hasNext()) {
+ URI uri = urisLeft.next();
+ try {
+ Spec spec = this.flowCatalog.get().getSpecs(uri);
+ // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership
change if the property is set to true
+ if (spec instanceof FlowSpec && PropertiesUtils
Review Comment:
Duplicate code, can we extract them as one method?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -210,26 +245,67 @@ private void scheduleSpecsFromCatalog() {
clearRunningFlowState(drUris);
}
} catch (IOException e) {
- throw new RuntimeException("Failed to get the iterator of all Spec
URIS", e);
+ throw new RuntimeException("Failed to get Spec URIs with tag to clear
running flow state", e);
}
- while (specUris.hasNext()) {
- Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
- try {
- // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership
change if the property is set to true
- if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean((
- (FlowSpec) spec).getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
- Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec)
spec);
- onAddSpec(modifiedSpec);
- } else {
- onAddSpec(spec);
+ int startOffset = 0;
+ long batchGetStartTime;
+ long batchGetEndTime;
+
+ while (startOffset < numSpecs) {
+ batchGetStartTime = System.currentTimeMillis();
+ Collection<Spec> batchOfSpecs =
this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
+ Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
+ batchGetEndTime = System.currentTimeMillis();
+
+ while (batchOfSpecsIterator.hasNext()) {
+ Spec spec = batchOfSpecsIterator.next();
+ try {
+ // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership
change if the property is set to true
+ if (spec instanceof FlowSpec && PropertiesUtils
+ .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
+ "false")) {
+ Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec)
spec);
+ onAddSpec(modifiedSpec);
+ } else {
+ onAddSpec(spec);
+ }
+ urisLeftToSchedule.remove(spec.getUri());
+ } catch (Exception e) {
+ // If there is an uncaught error thrown during compilation, log it
and continue adding flows
+ _log.error("Could not schedule spec {} from flowCatalog due to ",
spec, e);
}
- } catch (Exception e) {
- // If there is an uncaught error thrown during compilation, log it and
continue adding flows
- _log.error("Could not schedule spec {} from flowCatalog due to ",
spec, e);
}
+ startOffset += this.loadSpecsBatchSize;
+ // This count is used to ensure the average spec get time is calculated
accurately for the last batch which may be
+ // smaller than the loadSpecsBatchSize
+ averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) /
batchOfSpecs.size();
}
+
+ // Ensure we did not miss any specs due to ordering changing
(deletions/insertions) while loading
+ Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
+ while (urisLeft.hasNext()) {
+ URI uri = urisLeft.next();
+ try {
+ Spec spec = this.flowCatalog.get().getSpecs(uri);
Review Comment:
Use getSpecWrapper? It's possible that it's adhoc flow and we delete it
after execute.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +418,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}
}
+ // Compare the modification timestamp of the spec being added if the
scheduler is being initialized, ideally we
+ // don't even want to do the same update twice as it will kill the
existing flow and reschedule it unnecessarily
+ Long modificationTime =
Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.modificationTimeKey,
"0"));
+ String uriString = flowSpec.getUri().toString();
+ // If the modification time is 0 (which means the original API was used to
retrieve spec or warm standby mode is not
+ // enabled), spec not in scheduler, or have a modification time associated
with it assume it's the most recent
+ if (modificationTime != 0L &&
this.scheduledFlowSpecs.containsKey(uriString)
+ && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+ if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {
Review Comment:
might be a super corner case, but if the modification time is the same, you
might want to check whether the flow is run-immediately, as the load method
overwrite that value and might skip the first run of the flow.
Issue Time Tracking
-------------------
Worklog Id: (was: 847593)
Time Spent: 2h 40m (was: 2.5h)
> Initialize scheduler with batch gets instead of individual get per flow
> -----------------------------------------------------------------------
>
> Key: GOBBLIN-1783
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1783
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
> We seek to improve initialization time of the JobScheduler upon restart or
> new leadership change by batching the mysql queries to get flow specs.
> Instead of making 1 mysql get call for each flow execution id, which scales
> extremely poorly with number of flows, we should group them to reduce number
> of calls and downtime.
> This implementation adds two new functions to the SpecStore interface,
> getSortedSpecURIs and getBatchedSpecs, that we use to achieve the batching.
> Because these two functionalities are generic enough to be used in derived
> classes of the SpecStore we add them to the base class. Although this
> requires any child classes to implement these functions, it allows any
> consumer of the parent class SpecStore to use this functionality without
> caring about the specific implementation of the SpecStore used (as
> JobScheduler does). Additionally, the getBatchedSpecs requires an offset or
> starting point to obtain the batches from so the consumer has to do some book
> keeping of where in the paginated gets we are but this again separates the
> functionality from the use case of the consumer. the entirety of the flow
> catalog is too large to load into memory for the Scheduler, so we use this
> batch functionality.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)