ZihanLi58 commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1117801844


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -210,26 +245,67 @@ private void scheduleSpecsFromCatalog() {
         clearRunningFlowState(drUris);
       }
     } catch (IOException e) {
-      throw new RuntimeException("Failed to get the iterator of all Spec 
URIS", e);
+      throw new RuntimeException("Failed to get Spec URIs with tag to clear 
running flow state", e);
     }
 
-    while (specUris.hasNext()) {
-      Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
-      try {
-        // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership 
change if the property is set to true
-        if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean((
-            (FlowSpec) spec).getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
-          Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) 
spec);
-          onAddSpec(modifiedSpec);
-        } else {
-          onAddSpec(spec);
+    int startOffset = 0;
+    long batchGetStartTime;
+    long batchGetEndTime;
+
+    while (startOffset < numSpecs) {
+      batchGetStartTime  = System.currentTimeMillis();
+      Collection<Spec> batchOfSpecs = 
this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
+      Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
+      batchGetEndTime = System.currentTimeMillis();
+
+      while (batchOfSpecsIterator.hasNext()) {
+        Spec spec = batchOfSpecsIterator.next();
+        try {
+          // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership 
change if the property is set to true
+          if (spec instanceof FlowSpec && PropertiesUtils
+              .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
+                  "false")) {
+            Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) 
spec);
+            onAddSpec(modifiedSpec);
+          } else {
+            onAddSpec(spec);
+          }
+          urisLeftToSchedule.remove(spec.getUri());
+        } catch (Exception e) {
+          // If there is an uncaught error thrown during compilation, log it 
and continue adding flows
+          _log.error("Could not schedule spec {} from flowCatalog due to ", 
spec, e);
         }
-      } catch (Exception e) {
-        // If there is an uncaught error thrown during compilation, log it and 
continue adding flows
-        _log.error("Could not schedule spec {} from flowCatalog due to ", 
spec, e);
       }
+      startOffset += this.loadSpecsBatchSize;
+      // This count is used to ensure the average spec get time is calculated 
accurately for the last batch which may be
+      // smaller than the loadSpecsBatchSize
+      averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) / 
batchOfSpecs.size();
     }
+
+    // Ensure we did not miss any specs due to ordering changing 
(deletions/insertions) while loading
+    Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
+    while (urisLeft.hasNext()) {
+        URI uri = urisLeft.next();
+        try {
+          Spec spec = this.flowCatalog.get().getSpecs(uri);
+          // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership 
change if the property is set to true
+          if (spec instanceof FlowSpec && PropertiesUtils

Review Comment:
   Duplicate code, can we extract them as one method?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -210,26 +245,67 @@ private void scheduleSpecsFromCatalog() {
         clearRunningFlowState(drUris);
       }
     } catch (IOException e) {
-      throw new RuntimeException("Failed to get the iterator of all Spec 
URIS", e);
+      throw new RuntimeException("Failed to get Spec URIs with tag to clear 
running flow state", e);
     }
 
-    while (specUris.hasNext()) {
-      Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
-      try {
-        // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership 
change if the property is set to true
-        if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean((
-            (FlowSpec) spec).getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
-          Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) 
spec);
-          onAddSpec(modifiedSpec);
-        } else {
-          onAddSpec(spec);
+    int startOffset = 0;
+    long batchGetStartTime;
+    long batchGetEndTime;
+
+    while (startOffset < numSpecs) {
+      batchGetStartTime  = System.currentTimeMillis();
+      Collection<Spec> batchOfSpecs = 
this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
+      Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
+      batchGetEndTime = System.currentTimeMillis();
+
+      while (batchOfSpecsIterator.hasNext()) {
+        Spec spec = batchOfSpecsIterator.next();
+        try {
+          // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership 
change if the property is set to true
+          if (spec instanceof FlowSpec && PropertiesUtils
+              .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
+                  "false")) {
+            Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) 
spec);
+            onAddSpec(modifiedSpec);
+          } else {
+            onAddSpec(spec);
+          }
+          urisLeftToSchedule.remove(spec.getUri());
+        } catch (Exception e) {
+          // If there is an uncaught error thrown during compilation, log it 
and continue adding flows
+          _log.error("Could not schedule spec {} from flowCatalog due to ", 
spec, e);
         }
-      } catch (Exception e) {
-        // If there is an uncaught error thrown during compilation, log it and 
continue adding flows
-        _log.error("Could not schedule spec {} from flowCatalog due to ", 
spec, e);
       }
+      startOffset += this.loadSpecsBatchSize;
+      // This count is used to ensure the average spec get time is calculated 
accurately for the last batch which may be
+      // smaller than the loadSpecsBatchSize
+      averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) / 
batchOfSpecs.size();
     }
+
+    // Ensure we did not miss any specs due to ordering changing 
(deletions/insertions) while loading
+    Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
+    while (urisLeft.hasNext()) {
+        URI uri = urisLeft.next();
+        try {
+          Spec spec = this.flowCatalog.get().getSpecs(uri);

Review Comment:
   Use getSpecWrapper? It's possible that it's adhoc flow and we delete it 
after execute. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +418,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the 
scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the 
existing flow and reschedule it unnecessarily
+    Long modificationTime = 
Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.modificationTimeKey,
 "0"));
+    String uriString = flowSpec.getUri().toString();
+    // If the modification time is 0 (which means the original API was used to 
retrieve spec or warm standby mode is not
+    // enabled), spec not in scheduler, or have a modification time associated 
with it assume it's the most recent
+    if (modificationTime != 0L && 
this.scheduledFlowSpecs.containsKey(uriString)
+        && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+      if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {

Review Comment:
   might be a super corner case, but if the modification time is the same, you 
might want to check whether the flow is run-immediately, as the load method 
overwrite that value and might skip the first run of the flow. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to