This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d6b5c7920 fix add spec and actual number flows scheduled count (#3660)
d6b5c7920 is described below

commit d6b5c79205133c7ef3898fb0b04339a57b33b36c
Author: umustafi <[email protected]>
AuthorDate: Fri Mar 10 17:11:28 2023 -0800

    fix add spec and actual number flows scheduled count (#3660)
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../scheduler/GobblinServiceJobScheduler.java      | 23 +++++++++++++---------
 1 file changed, 14 insertions(+), 9 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 53713fe35..5ee5f9789 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -247,8 +247,9 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
     }
   }
 
-  /** Check that a spec should be scheduled and if it is, modify the spec of 
an adhoc flow before adding to scheduler*/
-  private void addSpecHelperMethod(Spec spec) {
+  /** Return true if a spec should be scheduled and if it is, modify the spec 
of an adhoc flow before adding to
+   *  scheduler. Return false otherwise. */
+  private boolean addSpecHelperMethod(Spec spec) {
     // Adhoc flows will not have any job schedule key, but we should schedule 
them
     if (spec instanceof FlowSpec) {
       FlowSpec flowSpec = (FlowSpec) spec;
@@ -261,10 +262,12 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
         } else {
           onAddSpec(spec);
         }
+        return true;
       }
     }else {
       _log.debug("Not scheduling spec {} during startup as next job to 
schedule is outside of threshold.", spec);
     }
+    return false;
   }
 
   /**
@@ -342,9 +345,10 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
       while (batchOfSpecsIterator.hasNext()) {
         Spec spec = batchOfSpecsIterator.next();
         try {
-          addSpecHelperMethod(spec);
-          totalAddSpecTime += this.eachCompleteAddSpecValue; // this is 
updated by each call to onAddSpec
-          actualNumFlowsScheduled += 1;
+          if (addSpecHelperMethod(spec)) {
+            totalAddSpecTime += this.eachCompleteAddSpecValue; // this is 
updated by each call to onAddSpec
+            actualNumFlowsScheduled += 1;
+          }
         } catch (Exception e) {
           // If there is an uncaught error thrown during compilation, log it 
and continue adding flows
           _log.error("Could not schedule spec {} from flowCatalog due to ", 
spec, e);
@@ -369,12 +373,13 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
           Spec spec = this.flowCatalog.get().getSpecWrapper(uri);
           this.individualGetSpecSpeedValue = System.nanoTime() - 
individualGetSpecStartTime;
           totalGetTime += this.individualGetSpecSpeedValue;
-          addSpecHelperMethod(spec);
-          totalAddSpecTime += this.eachCompleteAddSpecValue; // this is 
updated by each call to onAddSpec
-          actualNumFlowsScheduled += 1;
+          if (addSpecHelperMethod(spec)) {
+            totalAddSpecTime += this.eachCompleteAddSpecValue; // this is 
updated by each call to onAddSpec
+            actualNumFlowsScheduled += 1;
+          }
         } catch (Exception e) {
           // If there is an uncaught error thrown during compilation, log it 
and continue adding flows
-          _log.error("Could not schedule spec uri {} from flowCatalog due to 
", uri, e);
+          _log.error("Could not schedule spec uri {} from flowCatalog due to 
{}", uri, e);
         }
     }
     // Reset value after its last value to get an accurate reading

Reply via email to