[ 
https://issues.apache.org/jira/browse/GOBBLIN-1783?focusedWorklogId=847593&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-847593
 ]

ASF GitHub Bot logged work on GOBBLIN-1783:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Feb/23 23:31
            Start Date: 24/Feb/23 23:31
    Worklog Time Spent: 10m 
      Work Description: ZihanLi58 commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1117801844


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -210,26 +245,67 @@ private void scheduleSpecsFromCatalog() {
         clearRunningFlowState(drUris);
       }
     } catch (IOException e) {
-      throw new RuntimeException("Failed to get the iterator of all Spec 
URIS", e);
+      throw new RuntimeException("Failed to get Spec URIs with tag to clear 
running flow state", e);
     }
 
-    while (specUris.hasNext()) {
-      Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
-      try {
-        // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership 
change if the property is set to true
-        if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean((
-            (FlowSpec) spec).getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
-          Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) 
spec);
-          onAddSpec(modifiedSpec);
-        } else {
-          onAddSpec(spec);
+    int startOffset = 0;
+    long batchGetStartTime;
+    long batchGetEndTime;
+
+    while (startOffset < numSpecs) {
+      batchGetStartTime  = System.currentTimeMillis();
+      Collection<Spec> batchOfSpecs = 
this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
+      Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
+      batchGetEndTime = System.currentTimeMillis();
+
+      while (batchOfSpecsIterator.hasNext()) {
+        Spec spec = batchOfSpecsIterator.next();
+        try {
+          // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership 
change if the property is set to true
+          if (spec instanceof FlowSpec && PropertiesUtils
+              .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
+                  "false")) {
+            Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) 
spec);
+            onAddSpec(modifiedSpec);
+          } else {
+            onAddSpec(spec);
+          }
+          urisLeftToSchedule.remove(spec.getUri());
+        } catch (Exception e) {
+          // If there is an uncaught error thrown during compilation, log it 
and continue adding flows
+          _log.error("Could not schedule spec {} from flowCatalog due to ", 
spec, e);
         }
-      } catch (Exception e) {
-        // If there is an uncaught error thrown during compilation, log it and 
continue adding flows
-        _log.error("Could not schedule spec {} from flowCatalog due to ", 
spec, e);
       }
+      startOffset += this.loadSpecsBatchSize;
+      // This count is used to ensure the average spec get time is calculated 
accurately for the last batch which may be
+      // smaller than the loadSpecsBatchSize
+      averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) / 
batchOfSpecs.size();
     }
+
+    // Ensure we did not miss any specs due to ordering changing 
(deletions/insertions) while loading
+    Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
+    while (urisLeft.hasNext()) {
+        URI uri = urisLeft.next();
+        try {
+          Spec spec = this.flowCatalog.get().getSpecs(uri);
+          // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership 
change if the property is set to true
+          if (spec instanceof FlowSpec && PropertiesUtils

Review Comment:
   Duplicate code, can we extract them as one method?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -210,26 +245,67 @@ private void scheduleSpecsFromCatalog() {
         clearRunningFlowState(drUris);
       }
     } catch (IOException e) {
-      throw new RuntimeException("Failed to get the iterator of all Spec 
URIS", e);
+      throw new RuntimeException("Failed to get Spec URIs with tag to clear 
running flow state", e);
     }
 
-    while (specUris.hasNext()) {
-      Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
-      try {
-        // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership 
change if the property is set to true
-        if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean((
-            (FlowSpec) spec).getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
-          Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) 
spec);
-          onAddSpec(modifiedSpec);
-        } else {
-          onAddSpec(spec);
+    int startOffset = 0;
+    long batchGetStartTime;
+    long batchGetEndTime;
+
+    while (startOffset < numSpecs) {
+      batchGetStartTime  = System.currentTimeMillis();
+      Collection<Spec> batchOfSpecs = 
this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
+      Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
+      batchGetEndTime = System.currentTimeMillis();
+
+      while (batchOfSpecsIterator.hasNext()) {
+        Spec spec = batchOfSpecsIterator.next();
+        try {
+          // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership 
change if the property is set to true
+          if (spec instanceof FlowSpec && PropertiesUtils
+              .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
+                  "false")) {
+            Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) 
spec);
+            onAddSpec(modifiedSpec);
+          } else {
+            onAddSpec(spec);
+          }
+          urisLeftToSchedule.remove(spec.getUri());
+        } catch (Exception e) {
+          // If there is an uncaught error thrown during compilation, log it 
and continue adding flows
+          _log.error("Could not schedule spec {} from flowCatalog due to ", 
spec, e);
         }
-      } catch (Exception e) {
-        // If there is an uncaught error thrown during compilation, log it and 
continue adding flows
-        _log.error("Could not schedule spec {} from flowCatalog due to ", 
spec, e);
       }
+      startOffset += this.loadSpecsBatchSize;
+      // This count is used to ensure the average spec get time is calculated 
accurately for the last batch which may be
+      // smaller than the loadSpecsBatchSize
+      averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) / 
batchOfSpecs.size();
     }
+
+    // Ensure we did not miss any specs due to ordering changing 
(deletions/insertions) while loading
+    Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
+    while (urisLeft.hasNext()) {
+        URI uri = urisLeft.next();
+        try {
+          Spec spec = this.flowCatalog.get().getSpecs(uri);

Review Comment:
   Use getSpecWrapper? It's possible that it's adhoc flow and we delete it 
after execute. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +418,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the 
scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the 
existing flow and reschedule it unnecessarily
+    Long modificationTime = 
Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.modificationTimeKey,
 "0"));
+    String uriString = flowSpec.getUri().toString();
+    // If the modification time is 0 (which means the original API was used to 
retrieve spec or warm standby mode is not
+    // enabled), spec not in scheduler, or have a modification time associated 
with it assume it's the most recent
+    if (modificationTime != 0L && 
this.scheduledFlowSpecs.containsKey(uriString)
+        && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+      if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {

Review Comment:
   might be a super corner case, but if the modification time is the same, you 
might want to check whether the flow is run-immediately, as the load method 
overwrite that value and might skip the first run of the flow. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 847593)
    Time Spent: 2h 40m  (was: 2.5h)

> Initialize scheduler with batch gets instead of individual get per flow
> -----------------------------------------------------------------------
>
>                 Key: GOBBLIN-1783
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1783
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> We seek to improve initialization time of the JobScheduler upon restart or 
> new leadership change by batching the mysql queries to get flow specs. 
> Instead of making 1 mysql get call for each flow execution id, which scales 
> extremely poorly with number of flows, we should group them to reduce number 
> of calls and downtime.
> This implementation adds two new functions to the SpecStore interface, 
> getSortedSpecURIs and getBatchedSpecs, that we use to achieve the batching. 
> Because these two functionalities are generic enough to be used in derived 
> classes of the SpecStore we add them to the base class. Although this 
> requires any child classes to implement these functions, it allows any 
> consumer of the parent class SpecStore to use this functionality without 
> caring about the specific implementation of the SpecStore used (as 
> JobScheduler does). Additionally, the getBatchedSpecs requires an offset or 
> starting point to obtain the batches from so the consumer has to do some book 
> keeping of where in the paginated gets we are but this again separates the 
> functionality from the use case of the consumer. the entirety of the flow 
> catalog is too large to load into memory for the Scheduler, so we use this 
> batch functionality. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to