umustafi commented on code in PR #3656:
URL: https://github.com/apache/gobblin/pull/3656#discussion_r1131523913


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -231,17 +247,59 @@ public void run() {
     }
   }
 
-  /** Helps modify spec before adding to scheduler for adhoc flows */
+  /** Check that a spec should be scheduled and if it is, modify the spec of 
an adhoc flow before adding to scheduler*/
   private void addSpecHelperMethod(Spec spec) {
-    // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if 
the property is set to true
-    if (spec instanceof FlowSpec && PropertiesUtils
-        .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
-            "false")) {
-      Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
-      onAddSpec(modifiedSpec);
+    // Adhoc flows will not have any job schedule key, but we should schedule 
them
+    FlowSpec flowSpec = (FlowSpec) spec;
+    if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)
+        || 
isNextRunWithinRangeToSchedule(flowSpec.getConfig().getString(ConfigurationKeys.JOB_SCHEDULE_KEY),
+        this.thresholdToSkipSchedulingFlowsAfter)) {
+      // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change 
if the property is set to true
+      if (spec instanceof FlowSpec && 
PropertiesUtils.getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
+          ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+        Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+        onAddSpec(modifiedSpec);
+      } else {
+        onAddSpec(spec);
+      }
     } else {
-      onAddSpec(spec);
+      _log.info("Not scheduling spec {} during startup as next job to schedule 
is outside of threshold.", spec);
+    }
+  }
+
+  /**
+   * Given a cron expression calculates the time for next run in days from 
current time, rounding up to the nearest day.
+   * @param cronExpression
+   * @return num days until next run, max integer in the case it cannot be 
calculated
+   */
+  public static int nextRunInDays(String cronExpression) {
+    CronExpression cron = null;
+    Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+    double numMillisInADay = 86400000;
+    try {
+      cron = new CronExpression(cronExpression);
+      cron.setTimeZone(TimeZone.getTimeZone("UTC"));
+      Date now = new Date();
+      Date nextValidTimeAfter = cron.getNextValidTimeAfter(now);
+      cal.setTime(nextValidTimeAfter);
+      long diff = cal.getTimeInMillis() - System.currentTimeMillis();
+      double diffInDays = diff / numMillisInADay;
+      return (int) Math.round(diffInDays);
+    } catch (ParseException e) {
+      e.printStackTrace();
     }
+    return -1;
+  }
+
+  /**
+   * Returns true if next run for the given cron schedule is sooner than the 
threshold to skip scheduling after, false
+   * otherwise. If the cron expression cannot be parsed and the next run 
cannot be calculated returns true to schedule.
+   * @param cronExpression
+   * @param thresholdToSkipScheduling represents number of days
+   */
+  public static boolean isNextRunWithinRangeToSchedule(String cronExpression, 
int thresholdToSkipScheduling) {
+    int days = nextRunInDays(cronExpression);
+    return days < thresholdToSkipScheduling;

Review Comment:
   Consolidated into one function and simplified a bit. It was taking longer 
than I'd like to debug the changes above but the new function is more 
streamlined. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to