[
https://issues.apache.org/jira/browse/GOBBLIN-1797?focusedWorklogId=850167&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-850167
]
ASF GitHub Bot logged work on GOBBLIN-1797:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 09/Mar/23 19:54
Start Date: 09/Mar/23 19:54
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3656:
URL: https://github.com/apache/gobblin/pull/3656#discussion_r1131523913
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -231,17 +247,59 @@ public void run() {
}
}
- /** Helps modify spec before adding to scheduler for adhoc flows */
+ /** Check that a spec should be scheduled and if it is, modify the spec of
an adhoc flow before adding to scheduler*/
private void addSpecHelperMethod(Spec spec) {
- // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if
the property is set to true
- if (spec instanceof FlowSpec && PropertiesUtils
- .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
- "false")) {
- Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
- onAddSpec(modifiedSpec);
+ // Adhoc flows will not have any job schedule key, but we should schedule
them
+ FlowSpec flowSpec = (FlowSpec) spec;
+ if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)
+ ||
isNextRunWithinRangeToSchedule(flowSpec.getConfig().getString(ConfigurationKeys.JOB_SCHEDULE_KEY),
+ this.thresholdToSkipSchedulingFlowsAfter)) {
+ // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
if the property is set to true
+ if (spec instanceof FlowSpec &&
PropertiesUtils.getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
+ ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+ Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+ onAddSpec(modifiedSpec);
+ } else {
+ onAddSpec(spec);
+ }
} else {
- onAddSpec(spec);
+ _log.info("Not scheduling spec {} during startup as next job to schedule
is outside of threshold.", spec);
+ }
+ }
+
+ /**
+ * Given a cron expression calculates the time for next run in days from
current time, rounding up to the nearest day.
+ * @param cronExpression
+ * @return num days until next run, max integer in the case it cannot be
calculated
+ */
+ public static int nextRunInDays(String cronExpression) {
+ CronExpression cron = null;
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ double numMillisInADay = 86400000;
+ try {
+ cron = new CronExpression(cronExpression);
+ cron.setTimeZone(TimeZone.getTimeZone("UTC"));
+ Date now = new Date();
+ Date nextValidTimeAfter = cron.getNextValidTimeAfter(now);
+ cal.setTime(nextValidTimeAfter);
+ long diff = cal.getTimeInMillis() - System.currentTimeMillis();
+ double diffInDays = diff / numMillisInADay;
+ return (int) Math.round(diffInDays);
+ } catch (ParseException e) {
+ e.printStackTrace();
}
+ return -1;
+ }
+
+ /**
+ * Returns true if next run for the given cron schedule is sooner than the
threshold to skip scheduling after, false
+ * otherwise. If the cron expression cannot be parsed and the next run
cannot be calculated returns true to schedule.
+ * @param cronExpression
+ * @param thresholdToSkipScheduling represents number of days
+ */
+ public static boolean isNextRunWithinRangeToSchedule(String cronExpression,
int thresholdToSkipScheduling) {
+ int days = nextRunInDays(cronExpression);
+ return days < thresholdToSkipScheduling;
Review Comment:
Consolidated into one function and simplified a bit. It was taking longer
than I'd like to debug the changes above but the new function is more
streamlined.
Issue Time Tracking
-------------------
Worklog Id: (was: 850167)
Time Spent: 1h 50m (was: 1h 40m)
> Skip scheduling flows far into future
> -------------------------------------
>
> Key: GOBBLIN-1797
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1797
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> The unschedule feature linked below sets a schedule to run Jan 1st of 2050 so
> far in advance that it will "never run"
> [https://jarvis.corp.linkedin.com/codesearch/result/?name=FlowConfigResourceLocalHandler.java&path=gobblin-elr%2Fgobblin-restli%2Fgobblin-flow-config-service%2Fgobblin-flow-config-service-server%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fgobblin%2Fservice&reponame=linkedin%2Fgobblin-elr#62]
> but potentially there are over 100k of these flows so we are loading and
> scheduling many unnecessary flows. On initialization we add a check that
> verifies the next run of the flow is within a certain time frame (100 days by
> default) and loads it into the scheduler if it is within that time frame. We
> choose that default value under the assumption that we will redeploy GaaS at
> least every 100 days and then if we approach a far out scheduled flow we will
> load it into the Scheduler. However, in most cases uses schedule flows for
> near future or immediately and those will all be scheduled. This PR also
> renames metrics and adds helpful new ones.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)