[ 
https://issues.apache.org/jira/browse/GOBBLIN-1840?focusedWorklogId=867597&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-867597
 ]

ASF GitHub Bot logged work on GOBBLIN-1840:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Jun/23 22:20
            Start Date: 26/Jun/23 22:20
    Worklog Time Spent: 10m 
      Work Description: homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1242790138


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, 
jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
         scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+            listener);
       } else {
-        LOGGER.info("No job schedule found, so running job " + jobUri);
+        LOGGER.info("No job schedule"
+            + " found, so running job " + jobUri);

Review Comment:
   nit: Does not need to be on a new line



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, 
jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
         scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+            listener);
       } else {
-        LOGGER.info("No job schedule found, so running job " + jobUri);
+        LOGGER.info("No job schedule"
+            + " found, so running job " + jobUri);
         this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new 
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+            listener));
       }
+
     } catch (JobException je) {
       LOGGER.error("Failed to schedule or run job " + jobUri, je);
     }
   }
 
   @Subscribe
-  public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent 
updateJobArrival) {
+  public synchronized void 
handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + 
updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+
+    if (this.isThrottleEnabled &&
+        this.jobNameToNextSchedulableTime.getOrDefault(jobName, 
Instant.ofEpochMilli(0)).isAfter(clock.instant())) {

Review Comment:
   Nit: This line is a bit dense. And to indicate beginning of time, the 
documentation for `Instant` has `Instant.MIN` or `Instant.EPOCH` which should 
be more readable.
   
   Also, intuitively it feels a little weird to read as "nextSchedulableTime is 
after current time". I feel it's more intuitive for it to be
   
   "current time is before nextSchedulableTime" i.e.
   ```
   clock.instant().isBefore(jobNameToNextSchedulableTime.getOrDefault(jobName, 
Instant.ofEpochMilli(0)))
   ```
   or IMO even more readable
   ```
   Instant nextSchedulableTime = 
jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.MIN);
   if (this.isThrottleEnabled && clock.instant().isBefore(nextSchedulableTime)) 
{
   ...
   ```



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends 
GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = 
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public 
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics 
jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration 
helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = 
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next 
schedulable time is  " + nextSchedulableTime );
+  }
+
+  @Override
+  public void onJobCompletion(JobContext jobContext)
+      throws Exception {
+    super.onJobCompletion(jobContext);
+    if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
Instant.ofEpochMilli(0));
+    } else {
+      Instant nextSchedulableTime = 
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
nextSchedulableTime);
+      LOG.info(jobContext.getJobName() + " finished completion. The next 
schedulable time is " + nextSchedulableTime );
+    }
+  }
+
+  @Override
+  public void onJobCancellation(JobContext jobContext)
+      throws Exception {
+    super.onJobCancellation(jobContext);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
Instant.ofEpochMilli(0));

Review Comment:
   Same as for job failed. We'd want something similar. 



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -70,10 +87,18 @@ public class GobblinHelixJobSchedulerTest {
   private GobblinTaskRunner gobblinTaskRunner;
 
   private Thread thread;
-
   private final String workflowIdSuffix1 = "_1504201348471";
   private final String workflowIdSuffix2 = "_1504201348472";
 
+  private Instant beginTime = Instant.ofEpochMilli(0);
+  private Instant withinThrottlePeriod = beginTime.plus(1, ChronoUnit.SECONDS);

Review Comment:
   Nit: seems like these can be final



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, 
jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
         scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+            listener);
       } else {
-        LOGGER.info("No job schedule found, so running job " + jobUri);
+        LOGGER.info("No job schedule"
+            + " found, so running job " + jobUri);
         this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new 
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+            listener));

Review Comment:
   nit: Does not need to be on a new line



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends 
GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = 
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public 
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics 
jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration 
helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = 
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next 
schedulable time is  " + nextSchedulableTime );
+  }
+
+  @Override
+  public void onJobCompletion(JobContext jobContext)
+      throws Exception {
+    super.onJobCompletion(jobContext);
+    if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
Instant.ofEpochMilli(0));

Review Comment:
   Also, instead of ofEpochMilli(0), let's use `Instant.EPOCH`



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, 
jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
         scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+            listener);

Review Comment:
   nit: Does not need to be on a new line



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends 
GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = 
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public 
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics 
jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration 
helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = 
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next 
schedulable time is  " + nextSchedulableTime );
+  }
+
+  @Override
+  public void onJobCompletion(JobContext jobContext)
+      throws Exception {
+    super.onJobCompletion(jobContext);
+    if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
Instant.ofEpochMilli(0));
+    } else {
+      Instant nextSchedulableTime = 
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
nextSchedulableTime);
+      LOG.info(jobContext.getJobName() + " finished completion. The next 
schedulable time is " + nextSchedulableTime );

Review Comment:
   Maybe `is completed` instead of `finished completion`?



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
       jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobUri);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, 
jobNameToNextSchedulableTime,
+              jobSchedulingThrottleTimeout, clock)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + jobUri);
         scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+            listener);
       } else {
-        LOGGER.info("No job schedule found, so running job " + jobUri);
+        LOGGER.info("No job schedule"
+            + " found, so running job " + jobUri);
         this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new 
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+            listener));
       }
+

Review Comment:
   nit: Does not need a new line



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -443,6 +487,10 @@ private void 
cancelJobIfRequired(DeleteJobConfigArrivalEvent deleteJobArrival) t
     }
   }
 
+  public void setThrottleEnabled(boolean throttleEnabled) {

Review Comment:
   use lombok `@Setter` instead of declaring one. Also, why do we need this? 
Isn't this value settable via the config even for testing?



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends 
GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = 
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public 
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics 
jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration 
helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = 
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next 
schedulable time is  " + nextSchedulableTime );

Review Comment:
   Nit: grammar "finished preparing"



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends 
GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = 
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public 
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics 
jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration 
helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = 
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next 
schedulable time is  " + nextSchedulableTime );

Review Comment:
   White space after the `nextSchedulableTime`



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends 
GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = 
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private Duration helixJobSchedulingThrottleTimeout;
+  private Clock clock;
+
+  public 
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics 
jobLauncherMetrics,
+      ConcurrentHashMap jobNameToNextSchedulableTime, Duration 
helixJobSchedulingThrottleTimeout, Clock clock) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+    this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+    this.clock = clock;
+  }
+
+  @Override
+  public void onJobPrepare(JobContext jobContext)
+      throws Exception {
+    super.onJobPrepare(jobContext);
+    Instant nextSchedulableTime = 
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
nextSchedulableTime);
+    LOG.info(jobContext.getJobName() + " finished prepare. The next 
schedulable time is  " + nextSchedulableTime );
+  }
+
+  @Override
+  public void onJobCompletion(JobContext jobContext)
+      throws Exception {
+    super.onJobCompletion(jobContext);
+    if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), 
Instant.ofEpochMilli(0));

Review Comment:
   Add a log if the job failed. I see there is an existing log for the entire 
job context, but having a log specifically from the throttling scheduler would 
be important here for those not familiar with the code when they are debugging
   
   
https://github.com/apache/gobblin/blob/702cadf48f910c79b129032aa673f08ce4397c03/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/AbstractJobListener.java#L59-L63



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -52,16 +62,23 @@
 import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
 import org.apache.gobblin.scheduler.SchedulerService;
 
+import static org.mockito.Mockito.when;
+
 
 /**
  * Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
  *
  */
-@Test(groups = {"gobblin.cluster"})
+
+/**
+ * In all test cases, we use GobblinHelixManagerFactory instead of
+ * HelixManagerFactory, and use HelixManager as a local variable to avoid
+ * the HelixManager (ZkClient) is not connected error when that's set as

Review Comment:
   Potentially better wording
   > and instantiate a local HelixManager per test to provide isolation and 
prevent errors caused by the ZKClient being shared (e.g. ZKClient is not 
connected exceptions). 



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -70,10 +87,18 @@ public class GobblinHelixJobSchedulerTest {
   private GobblinTaskRunner gobblinTaskRunner;
 
   private Thread thread;
-
   private final String workflowIdSuffix1 = "_1504201348471";
   private final String workflowIdSuffix2 = "_1504201348472";
 
+  private Instant beginTime = Instant.ofEpochMilli(0);

Review Comment:
   `Instant.EPOCH`



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -52,16 +62,23 @@
 import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
 import org.apache.gobblin.scheduler.SchedulerService;
 
+import static org.mockito.Mockito.when;
+
 
 /**
  * Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
  *
  */
-@Test(groups = {"gobblin.cluster"})
+

Review Comment:
   Merge this with the java doc above



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +148,176 @@ public void setUp()
     this.thread.start();
   }
 
+  // Time span exceeds throttle timeout, within same workflow, throttle is 
enabled
+  // Job will be updated

Review Comment:
   Comments describing the method should be a java doc instead of a regular 
comment



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +148,176 @@ public void setUp()
     this.thread.start();
   }
 
+  // Time span exceeds throttle timeout, within same workflow, throttle is 
enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is 
enabled
+  // Job will not be updated
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateSameWorkflowShortPeriodThrottle()
       throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  // Time span exceeds throttle timeout, within same workflow, throttle is not 
enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is 
not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle 
is enabled
+  // Job will be updated
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle 
is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle 
is enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle 
is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager 
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = 
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + 
GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, 
java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, 
"1", workflowIdSuffix1);
-    
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
+    GobblinHelixJobScheduler gobblinHelixJobScheduler;
+    if (isThrottleEnabled) {
+      gobblinHelixJobScheduler = new 
GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, 
java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog, clock);
+    }
+    else {
+      gobblinHelixJobScheduler = new 
GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, 
java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
+    }
+    gobblinHelixJobScheduler.setThrottleEnabled(isThrottleEnabled);

Review Comment:
   nit: wouldn't we want to set this via config?



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +148,176 @@ public void setUp()
     this.thread.start();
   }
 
+  // Time span exceeds throttle timeout, within same workflow, throttle is 
enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is 
enabled
+  // Job will not be updated
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateSameWorkflowShortPeriodThrottle()
       throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  // Time span exceeds throttle timeout, within same workflow, throttle is not 
enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is 
not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle 
is enabled
+  // Job will be updated
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle 
is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle 
is enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle 
is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager 
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = 
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + 
GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, 
java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, 
"1", workflowIdSuffix1);
-    
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
+    GobblinHelixJobScheduler gobblinHelixJobScheduler;
+    if (isThrottleEnabled) {
+      gobblinHelixJobScheduler = new 
GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, 
java.util.Optional.empty(),

Review Comment:
   We can inject the clock regardless of if throttling is enabled. We'd never 
want to use UTC clock in a unit test IMO



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +148,176 @@ public void setUp()
     this.thread.start();
   }
 
+  // Time span exceeds throttle timeout, within same workflow, throttle is 
enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is 
enabled
+  // Job will not be updated
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateSameWorkflowShortPeriodThrottle()
       throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  // Time span exceeds throttle timeout, within same workflow, throttle is not 
enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within same workflow, throttle is 
not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle 
is enabled
+  // Job will be updated
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span is within throttle timeout, within different workflow, throttle 
is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle 
is enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  // Time span exceeds throttle timeout, within different workflow, throttle 
is not enabled
+  // Job will be updated
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager 
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = 
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + 
GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, 
java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, 
"1", workflowIdSuffix1);
-    
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
+    GobblinHelixJobScheduler gobblinHelixJobScheduler;
+    if (isThrottleEnabled) {
+      gobblinHelixJobScheduler = new 
GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, 
java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog, clock);
+    }
+    else {
+      gobblinHelixJobScheduler = new 
GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager, 
java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
+    }
+    gobblinHelixJobScheduler.setThrottleEnabled(isThrottleEnabled);
+    return gobblinHelixJobScheduler;
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties 
properties) {
+    
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new 
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new 
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties);
+    return newJobConfigArrivalEvent;
+  }
+
+  private void connectAndAssertWorkflowId(String expectedSuffix, 
NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) 
throws Exception {
+    helixManager.connect();
+    String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
+    Assert.assertNotNull(workFlowId);
+    Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+  }
 
-    String workFlowId = null;
+  private String getWorkflowID (NewJobConfigArrivalEvent 
newJobConfigArrivalEvent, HelixManager helixManager)
+      throws Exception {
+    // endTime is manually set time period that we allow HelixUtils to fetch 
workflowIdMap before timeout

Review Comment:
   Maybe better wording:
   > Poll helix for up to 30 seconds to fetch until a workflow with a matching 
job name exists in Helix and then return that workflowID



##########
gobblin-cluster/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker:
##########
@@ -0,0 +1 @@
+mock-maker-inline

Review Comment:
   I don't think we need this anymore since we are not mocking any static 
classes





Issue Time Tracking
-------------------

    Worklog Id:     (was: 867597)
    Time Spent: 2h 40m  (was: 2.5h)

> Helix Job scheduler should not try to replace running workflow if within 
> configured time
> ----------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1840
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1840
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Matthew Ho
>            Priority: Major
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Reply via email to