ZihanLi58 commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1115178711


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +412,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the 
scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the 
existing flow and reschedule it unnecessarily
+    Long modificationTime = 
Long.valueOf(flowSpec.getConfigAsProperties().getProperty(MysqlBaseSpecStore.modificationTimeKey,
 "0"));
+    if (this.isSchedulingSpecsFromCatalog()) {
+      String uriString = flowSpec.getUri().toString();
+      // If spec does not exist in scheduler or have a modification time 
associated with it, assume it's the most recent
+      if (this.scheduledFlowSpecs.containsKey(uriString) && 
this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+        if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= 
modificationTime) {

Review Comment:
   if modification time is 0, which means it's not enabled with multi leader, I 
believe you want to process it anyway



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -210,26 +239,67 @@ private void scheduleSpecsFromCatalog() {
         clearRunningFlowState(drUris);
       }
     } catch (IOException e) {
-      throw new RuntimeException("Failed to get the iterator of all Spec 
URIS", e);
+      throw new RuntimeException("Failed to get Spec URIs with tag to clear 
running flow state", e);
     }
 
-    while (specUris.hasNext()) {
-      Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
-      try {
-        // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership 
change if the property is set to true
-        if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean((
-            (FlowSpec) spec).getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
-          Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) 
spec);
-          onAddSpec(modifiedSpec);
-        } else {
-          onAddSpec(spec);
+    int startOffset = 0;
+    long batchGetStartTime;
+    long batchGetEndTime;
+
+    while (startOffset < numSpecs) {
+      batchGetStartTime  = System.currentTimeMillis();
+      Collection<Spec> batchOfSpecs = 
this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
+      Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
+      batchGetEndTime = System.currentTimeMillis();
+
+      while (batchOfSpecsIterator.hasNext()) {
+        Spec spec = batchOfSpecsIterator.next();
+        try {
+          // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership 
change if the property is set to true
+          if (spec instanceof FlowSpec && PropertiesUtils
+              .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
+                  "false")) {
+            Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) 
spec);
+            onAddSpec(modifiedSpec);
+          } else {
+            onAddSpec(spec);
+          }
+        } catch (Exception e) {
+          // If there is an uncaught error thrown during compilation, log it 
and continue adding flows
+          _log.error("Could not schedule spec {} from flowCatalog due to ", 
spec, e);
+        }
+      }
+      startOffset += this.loadSpecsBatchSize;
+      // This count is used to ensure the average spec get time is calculated 
accurately for the last batch which may be
+      // smaller than the loadSpecsBatchSize
+      averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) / 
batchOfSpecs.size();
+    }
+
+    // Ensure no specs were more specs were added during the load time
+    int updatedNumSpecs = flowCatalog.get().getSize();

Review Comment:
   As we talked before, if delete spec happens, this won't solve the issue, am 
I missing anything? Why not just use set to track what has been handled?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +412,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the 
scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the 
existing flow and reschedule it unnecessarily
+    Long modificationTime = 
Long.valueOf(flowSpec.getConfigAsProperties().getProperty(MysqlBaseSpecStore.modificationTimeKey,
 "0"));
+    if (this.isSchedulingSpecsFromCatalog()) {

Review Comment:
   we can do this compare all the time even if it's not the loading time 
period? What's the concern for not doing that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to