This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d6b5c7920 fix add spec and actual number flows scheduled count (#3660)
d6b5c7920 is described below
commit d6b5c79205133c7ef3898fb0b04339a57b33b36c
Author: umustafi <[email protected]>
AuthorDate: Fri Mar 10 17:11:28 2023 -0800
fix add spec and actual number flows scheduled count (#3660)
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../scheduler/GobblinServiceJobScheduler.java | 23 +++++++++++++---------
1 file changed, 14 insertions(+), 9 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 53713fe35..5ee5f9789 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -247,8 +247,9 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
}
}
- /** Check that a spec should be scheduled and if it is, modify the spec of
an adhoc flow before adding to scheduler*/
- private void addSpecHelperMethod(Spec spec) {
+ /** Return true if a spec should be scheduled and if it is, modify the spec
of an adhoc flow before adding to
+ * scheduler. Return false otherwise. */
+ private boolean addSpecHelperMethod(Spec spec) {
// Adhoc flows will not have any job schedule key, but we should schedule
them
if (spec instanceof FlowSpec) {
FlowSpec flowSpec = (FlowSpec) spec;
@@ -261,10 +262,12 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
} else {
onAddSpec(spec);
}
+ return true;
}
}else {
_log.debug("Not scheduling spec {} during startup as next job to
schedule is outside of threshold.", spec);
}
+ return false;
}
/**
@@ -342,9 +345,10 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
while (batchOfSpecsIterator.hasNext()) {
Spec spec = batchOfSpecsIterator.next();
try {
- addSpecHelperMethod(spec);
- totalAddSpecTime += this.eachCompleteAddSpecValue; // this is
updated by each call to onAddSpec
- actualNumFlowsScheduled += 1;
+ if (addSpecHelperMethod(spec)) {
+ totalAddSpecTime += this.eachCompleteAddSpecValue; // this is
updated by each call to onAddSpec
+ actualNumFlowsScheduled += 1;
+ }
} catch (Exception e) {
// If there is an uncaught error thrown during compilation, log it
and continue adding flows
_log.error("Could not schedule spec {} from flowCatalog due to ",
spec, e);
@@ -369,12 +373,13 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
Spec spec = this.flowCatalog.get().getSpecWrapper(uri);
this.individualGetSpecSpeedValue = System.nanoTime() -
individualGetSpecStartTime;
totalGetTime += this.individualGetSpecSpeedValue;
- addSpecHelperMethod(spec);
- totalAddSpecTime += this.eachCompleteAddSpecValue; // this is
updated by each call to onAddSpec
- actualNumFlowsScheduled += 1;
+ if (addSpecHelperMethod(spec)) {
+ totalAddSpecTime += this.eachCompleteAddSpecValue; // this is
updated by each call to onAddSpec
+ actualNumFlowsScheduled += 1;
+ }
} catch (Exception e) {
// If there is an uncaught error thrown during compilation, log it
and continue adding flows
- _log.error("Could not schedule spec uri {} from flowCatalog due to
", uri, e);
+ _log.error("Could not schedule spec uri {} from flowCatalog due to
{}", uri, e);
}
}
// Reset value after its last value to get an accurate reading