homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1242790138


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, 
jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
         scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+            listener);
       } else {
-        LOGGER.info("No job schedule found, so running job " + jobUri);
+        LOGGER.info("No job schedule"
+            + " found, so running job " + jobUri);

Review Comment:
   nit: Does not need to be on a new line



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, 
jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
         scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+            listener);
       } else {
-        LOGGER.info("No job schedule found, so running job " + jobUri);
+        LOGGER.info("No job schedule"
+            + " found, so running job " + jobUri);
         this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new 
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+            listener));
       }
+
     } catch (JobException je) {
       LOGGER.error("Failed to schedule or run job " + jobUri, je);
     }
   }
 
   @Subscribe
-  public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent 
updateJobArrival) {
+  public synchronized void 
handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + 
updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+
+    if (this.isThrottleEnabled &&
+        this.jobNameToNextSchedulableTime.getOrDefault(jobName, 
Instant.ofEpochMilli(0)).isAfter(clock.instant())) {

Review Comment:
   Nit: This line is a bit dense. And to indicate beginning of time, the 
documentation for `Instant` has `Instant.MIN` or `Instant.EPOCH` which should 
be more readable.
   
   Also, intuitively it feels a little weird to read as "nextSchedulableTime is 
after current time". I feel it's more intuitive for it to be
   
   "current time is before nextSchedulableTime" i.e.
   ```
   clock.instant().isBefore(jobNameToNextSchedulableTime.getOrDefault(jobName, 
Instant.ofEpochMilli(0)))
   ```
   or IMO even more readable
   ```
   Instant nextSchedulableTime = 
jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.MIN);
   if (this.isThrottleEnabled && clock.instant().isBefore(nextSchedulableTime)) 
{
   ...
   ```



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends 
GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = 
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public 
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics 
jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration 
helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = 
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next 
schedulable time is  " + nextSchedulableTime );
+  }
+
+  @Override
+  public void onJobCompletion(JobContext jobContext)
+      throws Exception {
+    super.onJobCompletion(jobContext);
+    if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
Instant.ofEpochMilli(0));
+    } else {
+      Instant nextSchedulableTime = 
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
nextSchedulableTime);
+      LOG.info(jobContext.getJobName() + " finished completion. The next 
schedulable time is " + nextSchedulableTime );
+    }
+  }
+
+  @Override
+  public void onJobCancellation(JobContext jobContext)
+      throws Exception {
+    super.onJobCancellation(jobContext);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
Instant.ofEpochMilli(0));

Review Comment:
   Same as for job failed. We'd want something similar. 



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -70,10 +87,18 @@ public class GobblinHelixJobSchedulerTest {
   private GobblinTaskRunner gobblinTaskRunner;
 
   private Thread thread;
-
   private final String workflowIdSuffix1 = "_1504201348471";
   private final String workflowIdSuffix2 = "_1504201348472";
 
+  private Instant beginTime = Instant.ofEpochMilli(0);
+  private Instant withinThrottlePeriod = beginTime.plus(1, ChronoUnit.SECONDS);

Review Comment:
   Nit: seems like these can be final



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, 
jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
         scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+            listener);
       } else {
-        LOGGER.info("No job schedule found, so running job " + jobUri);
+        LOGGER.info("No job schedule"
+            + " found, so running job " + jobUri);
         this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new 
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+            listener));

Review Comment:
   nit: Does not need to be on a new line



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends 
GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = 
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public 
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics 
jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration 
helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = 
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next 
schedulable time is  " + nextSchedulableTime );
+  }
+
+  @Override
+  public void onJobCompletion(JobContext jobContext)
+      throws Exception {
+    super.onJobCompletion(jobContext);
+    if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
Instant.ofEpochMilli(0));

Review Comment:
   Also, instead of ofEpochMilli(0), let's use `Instant.EPOCH`



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, 
jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
         scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+            listener);

Review Comment:
   nit: Does not need to be on a new line



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends 
GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = 
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public 
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics 
jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration 
helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = 
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next 
schedulable time is  " + nextSchedulableTime );
+  }
+
+  @Override
+  public void onJobCompletion(JobContext jobContext)
+      throws Exception {
+    super.onJobCompletion(jobContext);
+    if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
Instant.ofEpochMilli(0));
+    } else {
+      Instant nextSchedulableTime = 
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
nextSchedulableTime);
+      LOG.info(jobContext.getJobName() + " finished completion. The next 
schedulable time is " + nextSchedulableTime );

Review Comment:
   Maybe `is completed` instead of `finished completion`?



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, 
jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
         scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+            listener);
       } else {
-        LOGGER.info("No job schedule found, so running job " + jobUri);
+        LOGGER.info("No job schedule"
+            + " found, so running job " + jobUri);
         this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new 
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+            listener));
       }
+

Review Comment:
   nit: Does not need a new line



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -443,6 +487,10 @@ private void 
cancelJobIfRequired(DeleteJobConfigArrivalEvent deleteJobArrival) t
     }
   }
 
+  public void setThrottleEnabled(boolean throttleEnabled) {

Review Comment:
   use lombok `@Setter` instead of declaring one. Also, why do we need this? 
Isn't this value settable via the config even for testing?



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends 
GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = 
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public 
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics 
jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration 
helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = 
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next 
schedulable time is  " + nextSchedulableTime );

Review Comment:
   Nit: grammar "finished preparing"



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends 
GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = 
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public 
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics 
jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration 
helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = 
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next 
schedulable time is  " + nextSchedulableTime );

Review Comment:
   White space after the `nextSchedulableTime`



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends 
GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = 
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public 
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics 
jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration 
helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = 
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next 
schedulable time is  " + nextSchedulableTime );
+  }
+
+  @Override
+  public void onJobCompletion(JobContext jobContext)
+      throws Exception {
+    super.onJobCompletion(jobContext);
+    if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
Instant.ofEpochMilli(0));

Review Comment:
   Add a log if the job failed. I see there is an existing log for the entire 
job context, but having a log specifically from the throttling scheduler would 
be important here for those not familiar with the code when they are debugging
   
   
https://github.com/apache/gobblin/blob/702cadf48f910c79b129032aa673f08ce4397c03/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/AbstractJobListener.java#L59-L63



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -52,16 +62,23 @@
 import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
 import org.apache.gobblin.scheduler.SchedulerService;
 
+import static org.mockito.Mockito.when;
+
 
 /**
  * Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
  *
  */
-@Test(groups = {"gobblin.cluster"})
+
+/**
+ * In all test cases, we use GobblinHelixManagerFactory instead of
+ * HelixManagerFactory, and use HelixManager as a local variable to avoid
+ * the HelixManager (ZkClient) is not connected error when that's set as

Review Comment:
   Potentially better wording
   > and instantiate a local HelixManager per test to provide isolation and 
prevent errors caused by the ZKClient being shared (e.g. ZKClient is not 
connected exceptions). 



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -70,10 +87,18 @@ public class GobblinHelixJobSchedulerTest {
   private GobblinTaskRunner gobblinTaskRunner;
 
   private Thread thread;
-
   private final String workflowIdSuffix1 = "_1504201348471";
   private final String workflowIdSuffix2 = "_1504201348472";
 
+  private Instant beginTime = Instant.ofEpochMilli(0);

Review Comment:
   `Instant.EPOCH`



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -52,16 +62,23 @@
 import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
 import org.apache.gobblin.scheduler.SchedulerService;
 
+import static org.mockito.Mockito.when;
+
 
 /**
  * Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
  *
  */
-@Test(groups = {"gobblin.cluster"})
+

Review Comment:
   Merge this with the java doc above



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +148,176 @@ public void setUp()
     this.thread.start();
   }
 
+  // Time span exceeds throttle timeout, within same workflow, throttle is 
enabled
+  // Job will be updated

Review Comment:
   Comments describing the method should be a java doc instead of a regular 
comment



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +148,176 @@ public void setUp()
     this.thread.start();
   }
 
+  // Time span exceeds throttle timeout, within same workflow, throttle is 
enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is 
enabled
+  // Job will not be updated
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateSameWorkflowShortPeriodThrottle()
       throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  // Time span exceeds throttle timeout, within same workflow, throttle is not 
enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is 
not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle 
is enabled
+  // Job will be updated
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle 
is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle 
is enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle 
is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager 
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = 
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + 
GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, 
java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, 
"1", workflowIdSuffix1);
-    
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
+    GobblinHelixJobScheduler gobblinHelixJobScheduler;
+    if (isThrottleEnabled) {
+      gobblinHelixJobScheduler = new 
GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, 
java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog, clock);
+    }
+    else {
+      gobblinHelixJobScheduler = new 
GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, 
java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
+    }
+    gobblinHelixJobScheduler.setThrottleEnabled(isThrottleEnabled);

Review Comment:
   nit: wouldn't we want to set this via config?



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +148,176 @@ public void setUp()
     this.thread.start();
   }
 
+  // Time span exceeds throttle timeout, within same workflow, throttle is 
enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is 
enabled
+  // Job will not be updated
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateSameWorkflowShortPeriodThrottle()
       throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  // Time span exceeds throttle timeout, within same workflow, throttle is not 
enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is 
not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle 
is enabled
+  // Job will be updated
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle 
is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle 
is enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle 
is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager 
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = 
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + 
GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, 
java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, 
"1", workflowIdSuffix1);
-    
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
+    GobblinHelixJobScheduler gobblinHelixJobScheduler;
+    if (isThrottleEnabled) {
+      gobblinHelixJobScheduler = new 
GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, 
java.util.Optional.empty(),

Review Comment:
   We can inject the clock regardless of if throttling is enabled. We'd never 
want to use UTC clock in a unit test IMO



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +148,176 @@ public void setUp()
     this.thread.start();
   }
 
+  // Time span exceeds throttle timeout, within same workflow, throttle is 
enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is 
enabled
+  // Job will not be updated
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateSameWorkflowShortPeriodThrottle()
       throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  // Time span exceeds throttle timeout, within same workflow, throttle is not 
enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is 
not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle 
is enabled
+  // Job will be updated
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle 
is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle 
is enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle 
is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager 
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = 
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + 
GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, 
java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, 
"1", workflowIdSuffix1);
-    
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
+    GobblinHelixJobScheduler gobblinHelixJobScheduler;
+    if (isThrottleEnabled) {
+      gobblinHelixJobScheduler = new 
GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, 
java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog, clock);
+    }
+    else {
+      gobblinHelixJobScheduler = new 
GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, 
java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
+    }
+    gobblinHelixJobScheduler.setThrottleEnabled(isThrottleEnabled);
+    return gobblinHelixJobScheduler;
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties 
properties) {
+    
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new 
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new 
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties);
+    return newJobConfigArrivalEvent;
+  }
+
+  private void connectAndAssertWorkflowId(String expectedSuffix, 
NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) 
throws Exception {
+    helixManager.connect();
+    String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
+    Assert.assertNotNull(workFlowId);
+    Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+  }
 
-    String workFlowId = null;
+  private String getWorkflowID (NewJobConfigArrivalEvent 
newJobConfigArrivalEvent, HelixManager helixManager)
+      throws Exception {
+    // endTime is manually set time period that we allow HelixUtils to fetch 
workflowIdMap before timeout

Review Comment:
   Maybe better wording:
   > Poll helix for up to 30 seconds to fetch until a workflow with a matching 
job name exists in Helix and then return that workflowID



##########
gobblin-cluster/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker:
##########
@@ -0,0 +1 @@
+mock-maker-inline

Review Comment:
   I don't think we need this anymore since we are not mocking any static 
classes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to