[ 
https://issues.apache.org/jira/browse/GOBBLIN-1797?focusedWorklogId=850167&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-850167
 ]

ASF GitHub Bot logged work on GOBBLIN-1797:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Mar/23 19:54
            Start Date: 09/Mar/23 19:54
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3656:
URL: https://github.com/apache/gobblin/pull/3656#discussion_r1131523913


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -231,17 +247,59 @@ public void run() {
     }
   }
 
-  /** Helps modify spec before adding to scheduler for adhoc flows */
+  /** Check that a spec should be scheduled and if it is, modify the spec of 
an adhoc flow before adding to scheduler*/
   private void addSpecHelperMethod(Spec spec) {
-    // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if 
the property is set to true
-    if (spec instanceof FlowSpec && PropertiesUtils
-        .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
-            "false")) {
-      Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
-      onAddSpec(modifiedSpec);
+    // Adhoc flows will not have any job schedule key, but we should schedule 
them
+    FlowSpec flowSpec = (FlowSpec) spec;
+    if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)
+        || 
isNextRunWithinRangeToSchedule(flowSpec.getConfig().getString(ConfigurationKeys.JOB_SCHEDULE_KEY),
+        this.thresholdToSkipSchedulingFlowsAfter)) {
+      // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change 
if the property is set to true
+      if (spec instanceof FlowSpec && 
PropertiesUtils.getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(),
+          ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+        Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+        onAddSpec(modifiedSpec);
+      } else {
+        onAddSpec(spec);
+      }
     } else {
-      onAddSpec(spec);
+      _log.info("Not scheduling spec {} during startup as next job to schedule 
is outside of threshold.", spec);
+    }
+  }
+
+  /**
+   * Given a cron expression calculates the time for next run in days from 
current time, rounding up to the nearest day.
+   * @param cronExpression
+   * @return num days until next run, max integer in the case it cannot be 
calculated
+   */
+  public static int nextRunInDays(String cronExpression) {
+    CronExpression cron = null;
+    Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+    double numMillisInADay = 86400000;
+    try {
+      cron = new CronExpression(cronExpression);
+      cron.setTimeZone(TimeZone.getTimeZone("UTC"));
+      Date now = new Date();
+      Date nextValidTimeAfter = cron.getNextValidTimeAfter(now);
+      cal.setTime(nextValidTimeAfter);
+      long diff = cal.getTimeInMillis() - System.currentTimeMillis();
+      double diffInDays = diff / numMillisInADay;
+      return (int) Math.round(diffInDays);
+    } catch (ParseException e) {
+      e.printStackTrace();
     }
+    return -1;
+  }
+
+  /**
+   * Returns true if next run for the given cron schedule is sooner than the 
threshold to skip scheduling after, false
+   * otherwise. If the cron expression cannot be parsed and the next run 
cannot be calculated returns true to schedule.
+   * @param cronExpression
+   * @param thresholdToSkipScheduling represents number of days
+   */
+  public static boolean isNextRunWithinRangeToSchedule(String cronExpression, 
int thresholdToSkipScheduling) {
+    int days = nextRunInDays(cronExpression);
+    return days < thresholdToSkipScheduling;

Review Comment:
   Consolidated into one function and simplified a bit. It was taking longer 
than I'd like to debug the changes above but the new function is more 
streamlined. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 850167)
    Time Spent: 1h 50m  (was: 1h 40m)

> Skip scheduling flows far into future
> -------------------------------------
>
>                 Key: GOBBLIN-1797
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1797
>             Project: Apache Gobblin
>          Issue Type: Improvement
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> The unschedule feature linked below sets a schedule to run Jan 1st of 2050 so 
> far in advance that it will "never run" 
> [https://jarvis.corp.linkedin.com/codesearch/result/?name=FlowConfigResourceLocalHandler.java&path=gobblin-elr%2Fgobblin-restli%2Fgobblin-flow-config-service%2Fgobblin-flow-config-service-server%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fgobblin%2Fservice&reponame=linkedin%2Fgobblin-elr#62]
>  but potentially there are over 100k of these flows so we are loading and 
> scheduling many unnecessary flows. On initialization we add a check that 
> verifies the next run of the flow is within a certain time frame (100 days by 
> default) and loads it into the scheduler if it is within that time frame. We 
> choose that default value under the assumption that we will redeploy GaaS at 
> least every 100 days and then if we approach a far out scheduled flow we will 
> load it into the Scheduler. However, in most cases uses schedule flows for 
> near future or immediately and those will all be scheduled. This PR also 
> renames metrics and adds helpful new ones. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to