[
https://issues.apache.org/jira/browse/GOBBLIN-1797?focusedWorklogId=849946&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-849946
]
ASF GitHub Bot logged work on GOBBLIN-1797:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 09/Mar/23 02:09
Start Date: 09/Mar/23 02:09
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3656:
URL: https://github.com/apache/gobblin/pull/3656#discussion_r1130259453
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -108,24 +113,31 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
@Getter
protected final Map<String, Long> lastUpdatedTimeForFlowSpec;
protected volatile int loadSpecsBatchSize = -1;
+ protected int thresholdToSkipSchedulingFlowsAfter;
@Getter
private volatile boolean isActive;
private String serviceName;
- private volatile Long averageGetSpecTimeValue = -1L;
+ private volatile Long perSpecGetRateValue = -1L;
private volatile Long timeToInitializeSchedulerValue = -1L;
- private volatile Long timeToObtainSpecUrisNanosValue = -1L;
- private volatile Long individualGetSpecSpeedNanosValue = -1L;
- private volatile Long addSpecTimeNanosValue = -1L;
- private volatile Long flowCompilationTimeNanosValue = -1L;
- private volatile Long timeToScheduleOneJobValue = -1L;
- private final ContextAwareGauge averageGetSpecTimeNanos =
metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_NANOS,
() -> this.averageGetSpecTimeValue);;
+ private volatile Long timeToObtainSpecUrisValue = -1L;
+ private volatile Long individualGetSpecSpeedValue = -1L;
+ private volatile Long eachCompleteAddSpecValue = -1L;
+ private volatile Long eachSpecCompilationValue = -1L;
+ private volatile Long eachScheduleJobValue = -1L;
+ private volatile Long totalGetSpecTimeValue = -1L;
+ private volatile Long totalAddSpecTimeValue = -1L;
+ private volatile int numJobsScheduledDuringStartupValue = -1;
+ private final ContextAwareGauge getSpecsPerSpecRateNanos =
metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_GET_SPECS_DURING_STARTUP_PER_SPEC_RATE_NANOS,
() -> this.perSpecGetRateValue);;
Review Comment:
extra semi
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -162,22 +174,26 @@ public
GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
this.scheduledFlowSpecs = Maps.newHashMap();
this.lastUpdatedTimeForFlowSpec = Maps.newHashMap();
this.loadSpecsBatchSize =
Integer.parseInt(ConfigUtils.configToProperties(config).getProperty(ConfigurationKeys.LOAD_SPEC_BATCH_SIZE,
String.valueOf(ConfigurationKeys.DEFAULT_LOAD_SPEC_BATCH_SIZE)));
+ this.thresholdToSkipSchedulingFlowsAfter =
Integer.parseInt(ConfigUtils.configToProperties(config).getProperty(ConfigurationKeys.SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS,
String.valueOf(ConfigurationKeys.DEFAULT_NUM_DAYS_TO_SKIP_AFTER)));
Review Comment:
since this is a number of days, let's name the variable accordingly
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -231,17 +247,59 @@ public void run() {
}
}
- /** Helps modify spec before adding to scheduler for adhoc flows */
+ /** Check that a spec should be scheduled and if it is, modify the spec of
an adhoc flow before adding to scheduler*/
private void addSpecHelperMethod(Spec spec) {
- // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if
the property is set to true
- if (spec instanceof FlowSpec && PropertiesUtils
- .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
- "false")) {
- Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
- onAddSpec(modifiedSpec);
+ // Adhoc flows will not have any job schedule key, but we should schedule
them
+ FlowSpec flowSpec = (FlowSpec) spec;
Review Comment:
previously there was a `instanceof` check guarding the cast... are we
absolutely certain RTTI will succeed? even if so, best practice would be to
code defensively...
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -231,17 +247,59 @@ public void run() {
}
}
- /** Helps modify spec before adding to scheduler for adhoc flows */
+ /** Check that a spec should be scheduled and if it is, modify the spec of
an adhoc flow before adding to scheduler*/
private void addSpecHelperMethod(Spec spec) {
- // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if
the property is set to true
- if (spec instanceof FlowSpec && PropertiesUtils
- .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
- "false")) {
- Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
- onAddSpec(modifiedSpec);
+ // Adhoc flows will not have any job schedule key, but we should schedule
them
+ FlowSpec flowSpec = (FlowSpec) spec;
+ if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)
+ ||
isNextRunWithinRangeToSchedule(flowSpec.getConfig().getString(ConfigurationKeys.JOB_SCHEDULE_KEY),
+ this.thresholdToSkipSchedulingFlowsAfter)) {
+ // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
if the property is set to true
+ if (spec instanceof FlowSpec &&
PropertiesUtils.getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
+ ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+ Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+ onAddSpec(modifiedSpec);
+ } else {
+ onAddSpec(spec);
+ }
} else {
- onAddSpec(spec);
+ _log.info("Not scheduling spec {} during startup as next job to schedule
is outside of threshold.", spec);
Review Comment:
seems noisy to log >100k lines at info level.
incrementing `numJobsScheduledDuringStartupValue` must apparently occur some
ways away from this code... (that feels like the more appropriate substitute to
track the skipped)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -251,9 +309,14 @@ private void addSpecHelperMethod(Spec spec) {
* If it is newly brought up as the DR handler, will load additional
FlowSpecs and handle transition properly.
*/
private void scheduleSpecsFromCatalog() {
+ // TODO: clean up metrics after bottleneck is determined for startup to
only keep most important ones
Review Comment:
if helpful to measure now, likely helpful to retain...
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -292,17 +355,23 @@ private void scheduleSpecsFromCatalog() {
Spec spec = batchOfSpecsIterator.next();
try {
addSpecHelperMethod(spec);
- urisLeftToSchedule.remove(spec.getUri());
+ totalAddSpecTime += this.eachCompleteAddSpecValue; // this is
updated by each call to onAddSpec
+ actualNumFlowsScheduled += 1;
} catch (Exception e) {
// If there is an uncaught error thrown during compilation, log it
and continue adding flows
_log.error("Could not schedule spec {} from flowCatalog due to ",
spec, e);
}
+ urisLeftToSchedule.remove(spec.getUri());
}
startOffset += this.loadSpecsBatchSize;
- // This count is used to ensure the average spec get time is calculated
accurately for the last batch which may be
- // smaller than the loadSpecsBatchSize
- averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) /
batchOfSpecs.size();
+ totalGetTime += batchGetEndTime - batchGetStartTime;
+ // Don't skew the average get spec time value with the last batch that
may be very small
+ if (batchOfSpecs.size() == this.loadSpecsBatchSize) {
Review Comment:
pretty strict...
* maybe log always if `startOffset == 0` (to account for total num records <
batch size)
* perhaps so long as size() > round(0.75 * batchSize) ?
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -90,7 +90,9 @@ public class ConfigurationKeys {
public static final String JOB_RETRIGGERING_ENABLED =
"job.retriggering.enabled";
public static final String DEFAULT_JOB_RETRIGGERING_ENABLED = "true";
public static final String LOAD_SPEC_BATCH_SIZE = "load.spec.batch.size";
- public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 100;
+ public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
+ public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS =
"skip.scheduling.flows.after.num.days";
+ public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 100;
Review Comment:
my $0.02 -
sounds like 100d vs 365d shouldn't materially change how many flows get
scheduled... but it would definitely extend the timeframe to accomplish
redeployment, which is clearly safer ITO ensuring the presumed redeployment
actually happens in time. hence I vote also for 1 year.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -292,17 +355,23 @@ private void scheduleSpecsFromCatalog() {
Spec spec = batchOfSpecsIterator.next();
try {
addSpecHelperMethod(spec);
- urisLeftToSchedule.remove(spec.getUri());
+ totalAddSpecTime += this.eachCompleteAddSpecValue; // this is
updated by each call to onAddSpec
+ actualNumFlowsScheduled += 1;
} catch (Exception e) {
// If there is an uncaught error thrown during compilation, log it
and continue adding flows
_log.error("Could not schedule spec {} from flowCatalog due to ",
spec, e);
}
+ urisLeftToSchedule.remove(spec.getUri());
}
startOffset += this.loadSpecsBatchSize;
- // This count is used to ensure the average spec get time is calculated
accurately for the last batch which may be
- // smaller than the loadSpecsBatchSize
- averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) /
batchOfSpecs.size();
+ totalGetTime += batchGetEndTime - batchGetStartTime;
+ // Don't skew the average get spec time value with the last batch that
may be very small
+ if (batchOfSpecs.size() == this.loadSpecsBatchSize) {
+ perSpecGetRateValue = (batchGetEndTime - batchGetStartTime) /
batchOfSpecs.size();
+ }
}
+ // Reset value after its last value to get an accurate reading
+ perSpecGetRateValue = -1L;
Review Comment:
if other metrics are to be reset too, let's do them all together at the end.
ideally we'd even perform in a timer callback 1 minute in the future, to
ensure the value is finished emitting before being cleared...
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -231,17 +247,59 @@ public void run() {
}
}
- /** Helps modify spec before adding to scheduler for adhoc flows */
+ /** Check that a spec should be scheduled and if it is, modify the spec of
an adhoc flow before adding to scheduler*/
private void addSpecHelperMethod(Spec spec) {
- // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if
the property is set to true
- if (spec instanceof FlowSpec && PropertiesUtils
- .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
- "false")) {
- Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
- onAddSpec(modifiedSpec);
+ // Adhoc flows will not have any job schedule key, but we should schedule
them
+ FlowSpec flowSpec = (FlowSpec) spec;
+ if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)
+ ||
isNextRunWithinRangeToSchedule(flowSpec.getConfig().getString(ConfigurationKeys.JOB_SCHEDULE_KEY),
+ this.thresholdToSkipSchedulingFlowsAfter)) {
+ // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
if the property is set to true
+ if (spec instanceof FlowSpec &&
PropertiesUtils.getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
+ ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+ Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+ onAddSpec(modifiedSpec);
+ } else {
+ onAddSpec(spec);
+ }
} else {
- onAddSpec(spec);
+ _log.info("Not scheduling spec {} during startup as next job to schedule
is outside of threshold.", spec);
+ }
+ }
+
+ /**
+ * Given a cron expression calculates the time for next run in days from
current time, rounding up to the nearest day.
+ * @param cronExpression
+ * @return num days until next run, max integer in the case it cannot be
calculated
+ */
+ public static int nextRunInDays(String cronExpression) {
+ CronExpression cron = null;
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ double numMillisInADay = 86400000;
+ try {
+ cron = new CronExpression(cronExpression);
+ cron.setTimeZone(TimeZone.getTimeZone("UTC"));
+ Date now = new Date();
+ Date nextValidTimeAfter = cron.getNextValidTimeAfter(now);
+ cal.setTime(nextValidTimeAfter);
+ long diff = cal.getTimeInMillis() - System.currentTimeMillis();
+ double diffInDays = diff / numMillisInADay;
+ return (int) Math.round(diffInDays);
+ } catch (ParseException e) {
+ e.printStackTrace();
}
+ return -1;
+ }
+
+ /**
+ * Returns true if next run for the given cron schedule is sooner than the
threshold to skip scheduling after, false
+ * otherwise. If the cron expression cannot be parsed and the next run
cannot be calculated returns true to schedule.
+ * @param cronExpression
+ * @param thresholdToSkipScheduling represents number of days
+ */
+ public static boolean isNextRunWithinRangeToSchedule(String cronExpression,
int thresholdToSkipScheduling) {
Review Comment:
`@VisibleForTesting`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -231,17 +247,59 @@ public void run() {
}
}
- /** Helps modify spec before adding to scheduler for adhoc flows */
+ /** Check that a spec should be scheduled and if it is, modify the spec of
an adhoc flow before adding to scheduler*/
private void addSpecHelperMethod(Spec spec) {
- // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if
the property is set to true
- if (spec instanceof FlowSpec && PropertiesUtils
- .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
- "false")) {
- Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
- onAddSpec(modifiedSpec);
+ // Adhoc flows will not have any job schedule key, but we should schedule
them
+ FlowSpec flowSpec = (FlowSpec) spec;
+ if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)
+ ||
isNextRunWithinRangeToSchedule(flowSpec.getConfig().getString(ConfigurationKeys.JOB_SCHEDULE_KEY),
+ this.thresholdToSkipSchedulingFlowsAfter)) {
+ // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
if the property is set to true
+ if (spec instanceof FlowSpec &&
PropertiesUtils.getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
+ ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+ Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+ onAddSpec(modifiedSpec);
+ } else {
+ onAddSpec(spec);
+ }
} else {
- onAddSpec(spec);
+ _log.info("Not scheduling spec {} during startup as next job to schedule
is outside of threshold.", spec);
+ }
+ }
+
+ /**
+ * Given a cron expression calculates the time for next run in days from
current time, rounding up to the nearest day.
+ * @param cronExpression
+ * @return num days until next run, max integer in the case it cannot be
calculated
+ */
+ public static int nextRunInDays(String cronExpression) {
+ CronExpression cron = null;
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ double numMillisInADay = 86400000;
+ try {
+ cron = new CronExpression(cronExpression);
+ cron.setTimeZone(TimeZone.getTimeZone("UTC"));
+ Date now = new Date();
+ Date nextValidTimeAfter = cron.getNextValidTimeAfter(now);
+ cal.setTime(nextValidTimeAfter);
+ long diff = cal.getTimeInMillis() - System.currentTimeMillis();
+ double diffInDays = diff / numMillisInADay;
+ return (int) Math.round(diffInDays);
+ } catch (ParseException e) {
+ e.printStackTrace();
}
+ return -1;
+ }
+
+ /**
+ * Returns true if next run for the given cron schedule is sooner than the
threshold to skip scheduling after, false
+ * otherwise. If the cron expression cannot be parsed and the next run
cannot be calculated returns true to schedule.
+ * @param cronExpression
+ * @param thresholdToSkipScheduling represents number of days
+ */
+ public static boolean isNextRunWithinRangeToSchedule(String cronExpression,
int thresholdToSkipScheduling) {
+ int days = nextRunInDays(cronExpression);
+ return days < thresholdToSkipScheduling;
Review Comment:
this and its helper seem possible to streamline; e.g:
```
boolean isWithinRange(...) {
CronExpression cron = ...
long nextCronEventAsEpochSecs = cron.getNextValidTimeAfter(new
Date()).getTime();
return nextCronEventAsEpochSecs <=
Instant.now().plus(maxNumDaysToScheduleWithin,
ChronoUnit.DAYS).getEpochSecond();
}
```
Issue Time Tracking
-------------------
Worklog Id: (was: 849946)
Time Spent: 1h 10m (was: 1h)
> Skip scheduling flows far into future
> -------------------------------------
>
> Key: GOBBLIN-1797
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1797
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> The unschedule feature linked below sets a schedule to run Jan 1st of 2050 so
> far in advance that it will "never run"
> [https://jarvis.corp.linkedin.com/codesearch/result/?name=FlowConfigResourceLocalHandler.java&path=gobblin-elr%2Fgobblin-restli%2Fgobblin-flow-config-service%2Fgobblin-flow-config-service-server%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fgobblin%2Fservice&reponame=linkedin%2Fgobblin-elr#62]
> but potentially there are over 100k of these flows so we are loading and
> scheduling many unnecessary flows. On initialization we add a check that
> verifies the next run of the flow is within a certain time frame (100 days by
> default) and loads it into the scheduler if it is within that time frame. We
> choose that default value under the assumption that we will redeploy GaaS at
> least every 100 days and then if we approach a far out scheduled flow we will
> load it into the Scheduler. However, in most cases uses schedule flows for
> near future or immediately and those will all be scheduled. This PR also
> renames metrics and adds helpful new ones.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)