This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ecce5bc7 [GOBBLIN-1840] Helix Job scheduler should not try to replace 
running workflow if within configured time (#3704)
1ecce5bc7 is described below

commit 1ecce5bc733b047b3c2b945cf4eba2bc21bac241
Author: Peiying Ye <[email protected]>
AuthorDate: Fri Jun 30 10:41:44 2023 -0700

    [GOBBLIN-1840] Helix Job scheduler should not try to replace running 
workflow if within configured time (#3704)
    
    * [GOBBLIN-1840] Helix Job scheduler should not try to replace running 
workflow if within configured time
    
    * [GOBBLIN-1840] Remove unnecessary files
    
    * [GOBBLIN-1840] Add config for throttleTimeoutDuration
    
    * [GOBBLIN-1840] Clean up format and coding standard
    
    * [GOBBLIN-1840] Clean up format layout
    
    * [GOBBLIN-1840] Clean up auto format
    
    * [GOBBLIN-1840] Clear up empty space
    
    * [GOBBLIN-1840] Clarify naming standards and simplify repeated codes
    
    * [GOBBLIN-1840] Add Javadoc on GobblinHelixJobSchedulerTest for setting 
HelixManager as local variable
    
    * [GOBBLIN-1840] Optimize imports and fix unit test errors
    
    * [GOBBLIN-1840] Rewrite log info and add Javadoc
    
    * [GOBBLIN-1840] Remove job status check
    
    * [GOBBLIN-1840] Add log info and change config setting
    
    * [GOBBLIN-1840] Add @Slf4j in GobblinThrottlingHelixJobLauncherListener
    
    * [GOBBLIN-1840] Fix race condition of handleNewJobConfigArrival
    
    * [GOBBLIN-1840] Improve mockClock mechanism
    
    * [GOBBLIN-1840] Address comments
    
    * [GOBBLIN-1840] Only put entry in jobNameToNextSchedulableTime when 
throttle is enabled
    
    * [GOBBLIN-1840] Remove extra schedulable time updates
    
    * [GOBBLIN-1840] Fix checkstyle problems
    
    ---------
    
    Co-authored-by: Peiying Ye <[email protected]>
---
 .../cluster/GobblinClusterConfigurationKeys.java   |   9 +
 .../gobblin/cluster/GobblinHelixJobScheduler.java  | 134 +++++++++---
 .../GobblinThrottlingHelixJobLauncherListener.java |  69 ++++++
 .../org/apache/gobblin/cluster/HelixUtils.java     |   6 +-
 .../cluster/GobblinHelixJobSchedulerTest.java      | 243 ++++++++++++++++-----
 5 files changed, 385 insertions(+), 76 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 31a8547aa..ef83ab029 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -17,6 +17,8 @@
 
 package org.apache.gobblin.cluster;
 
+import java.time.Duration;
+
 import org.apache.gobblin.annotation.Alpha;
 
 
@@ -222,4 +224,11 @@ public class GobblinClusterConfigurationKeys {
   public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX + 
"containerId";
 
   public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX = 
GOBBLIN_CLUSTER_PREFIX + "sysProps";
+
+  public static final String HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = 
"helix.job.scheduling.throttle.enabled";
+  public static final boolean 
DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = false;
+
+  public static final String HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY 
= "helix.job.scheduling.throttle.timeout.seconds";
+  public static final long 
DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY = 
Duration.ofMinutes(40).getSeconds();;
+
 }
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 20f47da3e..d30554d2b 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -18,6 +18,10 @@
 package org.apache.gobblin.cluster;
 
 import java.io.IOException;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -111,14 +115,28 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
   private boolean startServicesCompleted;
   private final long helixJobStopTimeoutMillis;
 
+  /**
+   * The throttling timeout prevents helix workflows with the same job name / 
URI from being submitted
+   * more than once within the timeout period. This timeout is not reset by 
deletes / cancels, meaning that
+   * if you delete a workflow within the timeout period, you cannot reschedule 
until the timeout period is complete.
+   * However, if there is an error when launching the job, you can immediately 
reschedule the flow. <br><br>
+   *
+   * NOTE: This throttle timeout period starts when the job launcher thread 
picks up the runnable. Meaning that the
+   * time it takes to submit to Helix and start running the flow is also 
included as part of the timeout period
+   */
+  private final Duration jobSchedulingThrottleTimeout;
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+  private boolean isThrottleEnabled;
+  private Clock clock;
+
   public GobblinHelixJobScheduler(Config sysConfig,
                                   HelixManager jobHelixManager,
                                   Optional<HelixManager> 
taskDriverHelixManager,
                                   EventBus eventBus,
                                   Path appWorkDir, List<? extends Tag<?>> 
metadataTags,
                                   SchedulerService schedulerService,
-                                  MutableJobCatalog jobCatalog) throws 
Exception {
-
+                                  MutableJobCatalog jobCatalog,
+                                  Clock clock) throws Exception {
     super(ConfigUtils.configToProperties(sysConfig), schedulerService);
     this.commonJobProperties = 
ConfigUtils.configToProperties(ConfigUtils.getConfigOrEmpty(sysConfig, 
COMMON_JOB_PROPS));
     this.jobHelixManager = jobHelixManager;
@@ -162,6 +180,27 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
     this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(sysConfig, 
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS,
         
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS) 
* 1000;
 
+    this.jobSchedulingThrottleTimeout = 
Duration.of(ConfigUtils.getLong(sysConfig, 
GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY,
+        
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY),
 ChronoUnit.SECONDS);
+
+    this.jobNameToNextSchedulableTime = new ConcurrentHashMap<>();
+
+    this.isThrottleEnabled = ConfigUtils.getBoolean(sysConfig, 
GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY);
+
+    this.clock = clock;
+  }
+
+  public GobblinHelixJobScheduler(Config sysConfig,
+      HelixManager jobHelixManager,
+      Optional<HelixManager> taskDriverHelixManager,
+      EventBus eventBus,
+      Path appWorkDir, List<? extends Tag<?>> metadataTags,
+      SchedulerService schedulerService,
+      MutableJobCatalog jobCatalog) throws Exception {
+
+    this(sysConfig, jobHelixManager, taskDriverHelixManager, eventBus, 
appWorkDir, metadataTags,
+        schedulerService, jobCatalog, Clock.systemUTC());
   }
 
   @Override
@@ -206,9 +245,9 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
 
     if (cleanAllDistJobs) {
       for (org.apache.gobblin.configuration.State state : 
this.jobsMapping.getAllStates()) {
-        String jobUri = state.getId();
-        LOGGER.info("Delete mapping for job " + jobUri);
-        this.jobsMapping.deleteMapping(jobUri);
+        String jobName = state.getId();
+        LOGGER.info("Delete mapping for job " + jobName);
+        this.jobsMapping.deleteMapping(jobName);
       }
     }
   }
@@ -303,36 +342,70 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
   }
 
   @Subscribe
-  public void handleNewJobConfigArrival(NewJobConfigArrivalEvent 
newJobArrival) {
-    String jobUri = newJobArrival.getJobName();
-    LOGGER.info("Received new job configuration of job " + jobUri);
+  public synchronized void handleNewJobConfigArrival(NewJobConfigArrivalEvent 
newJobArrival) {
+    String jobName = newJobArrival.getJobName();
+    LOGGER.info("Received new job configuration of job " + jobName);
+
+    Instant nextSchedulableTime = 
jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.EPOCH);
+    if (this.isThrottleEnabled && 
clock.instant().isBefore(nextSchedulableTime)) {
+      LOGGER.info("Adding new job is skipped for job {}. Current time is {} 
and the next schedulable time would be {}",
+          jobName,
+          clock.instant(),
+          nextSchedulableTime
+      );
+      return;
+    }
+
+    if (isThrottleEnabled) {
+      nextSchedulableTime = clock.instant().plus(jobSchedulingThrottleTimeout);
+      jobNameToNextSchedulableTime.put(jobName, nextSchedulableTime);
+    }
+
     try {
       Properties jobProps = new Properties();
       jobProps.putAll(this.commonJobProperties);
       jobProps.putAll(newJobArrival.getJobConfig());
 
       // set uri so that we can delete this job later
-      jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobUri);
+      jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, 
jobName);
 
       this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+      GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+          new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics, 
jobNameToNextSchedulableTime)
+          : new GobblinHelixJobLauncherListener(this.launcherMetrics);
       if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
-        LOGGER.info("Scheduling job " + jobUri);
-        scheduleJob(jobProps,
-                    new GobblinHelixJobLauncherListener(this.launcherMetrics));
+        LOGGER.info("Scheduling job " + jobName);
+        scheduleJob(jobProps, listener);
       } else {
-        LOGGER.info("No job schedule found, so running job " + jobUri);
-        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
-                                 new 
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+        LOGGER.info("No job schedule found, so running job " + jobName);
+        this.jobExecutor.execute(new NonScheduledJobRunner(jobProps, 
listener));
       }
     } catch (JobException je) {
-      LOGGER.error("Failed to schedule or run job " + jobUri, je);
+      LOGGER.error("Failed to schedule or run job {} . Reset the next 
scheduable time to {}",
+          jobName,
+          Instant.EPOCH,
+          je);
+      if (isThrottleEnabled) {
+        jobNameToNextSchedulableTime.put(jobName, Instant.EPOCH);
+      }
     }
   }
 
   @Subscribe
-  public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent 
updateJobArrival) {
+  public synchronized void 
handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + 
updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+
+    Instant nextSchedulableTime = 
jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.EPOCH);
+    if (this.isThrottleEnabled && 
clock.instant().isBefore(nextSchedulableTime)) {
+      LOGGER.info("Replanning is skipped for job {}. Current time is {} and 
the next schedulable time would be {}",
+          jobName,
+          clock.instant(),
+          nextSchedulableTime
+      );
+      return;
+    }
+
     try {
       handleDeleteJobConfigArrival(new 
DeleteJobConfigArrivalEvent(updateJobArrival.getJobName(),
           updateJobArrival.getJobConfig()));
@@ -359,8 +432,17 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
     }
   }
 
+  /***
+   * Deleting a workflow with throttling enabled means that the next
+   * schedulable time for the workflow will remain unchanged.
+   * Note: In such case, it is required to wait until the throttle
+   * timeout period elapses before the workflow can be rescheduled.
+   *
+   * @param deleteJobArrival
+   * @throws InterruptedException
+   */
   @Subscribe
-  public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent 
deleteJobArrival) throws InterruptedException {
+  public synchronized void 
handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) 
throws InterruptedException {
     LOGGER.info("Received delete for job configuration of job " + 
deleteJobArrival.getJobName());
     try {
       unscheduleJob(deleteJobArrival.getJobName());
@@ -373,8 +455,8 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
   @Subscribe
   public void handleCancelJobConfigArrival(CancelJobConfigArrivalEvent 
cancelJobArrival)
       throws InterruptedException {
-    String jobUri = cancelJobArrival.getJoburi();
-    LOGGER.info("Received cancel for job configuration of job " + jobUri);
+    String jobName = cancelJobArrival.getJoburi();
+    LOGGER.info("Received cancel for job configuration of job " + jobName);
     Optional<String> distributedJobMode;
     Optional<String> planningJob = Optional.empty();
     Optional<String> actualJob = Optional.empty();
@@ -384,14 +466,14 @@ public class GobblinHelixJobScheduler extends 
JobScheduler implements StandardMe
     this.jobSchedulerMetrics.numCancellationStart.incrementAndGet();
 
     try {
-      distributedJobMode = this.jobsMapping.getDistributedJobMode(jobUri);
+      distributedJobMode = this.jobsMapping.getDistributedJobMode(jobName);
       if (distributedJobMode.isPresent() && 
Boolean.parseBoolean(distributedJobMode.get())) {
-        planningJob = this.jobsMapping.getPlanningJobId(jobUri);
+        planningJob = this.jobsMapping.getPlanningJobId(jobName);
       } else {
-        actualJob = this.jobsMapping.getActualJobId(jobUri);
+        actualJob = this.jobsMapping.getActualJobId(jobName);
       }
     } catch (IOException e) {
-      LOGGER.warn("jobsMapping could not be retrieved for job {}", jobUri);
+      LOGGER.warn("jobsMapping could not be retrieved for job {}", jobName);
       return;
     }
 
@@ -466,7 +548,7 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
         
GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis,
 System.currentTimeMillis());
         GobblinHelixJobScheduler.this.runJob(this.jobProps, this.jobListener);
       } catch (JobException je) {
-        LOGGER.error("Failed to run job " + 
this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
+        LOGGER.error("Failed to schedule or run job " + 
this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
       }
     }
   }
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java
new file mode 100644
index 000000000..ec00be19b
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+@Slf4j
+public class GobblinThrottlingHelixJobLauncherListener extends 
GobblinHelixJobLauncherListener {
+
+  public final static Logger LOG = 
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+  private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+
+  public 
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics 
jobLauncherMetrics,
+      ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime) {
+    super(jobLauncherMetrics);
+    this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+  }
+
+  @Override
+  public void onJobCompletion(JobContext jobContext)
+      throws Exception {
+    super.onJobCompletion(jobContext);
+    if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+      jobNameToNextSchedulableTime.put(jobContext.getJobName(), Instant.EPOCH);
+      LOG.info("{} failed. The next schedulable time is {} so that any future 
schedule attempts will be allowed.",
+          jobContext.getJobName(), Instant.EPOCH);
+    }
+  }
+
+  @Override
+  public void onJobCancellation(JobContext jobContext)
+      throws Exception {
+    super.onJobCancellation(jobContext);
+    jobNameToNextSchedulableTime.put(jobContext.getJobName(), Instant.EPOCH);
+    LOG.info("{} is cancelled. The next schedulable time is {} so that any 
future schedule attempts will be allowed.",
+        jobContext.getJobName(), Instant.EPOCH);
+  }
+}
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index d62002967..688c8c7ba 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -417,7 +417,11 @@ public class HelixUtils {
       }
       Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
       for (String helixJob : helixJobs) {
-        Iterator<TaskConfig> taskConfigIterator = 
taskDriver.getJobConfig(helixJob).getTaskConfigMap().values().iterator();
+        JobConfig jobConfig = taskDriver.getJobConfig(helixJob);
+        if (jobConfig == null) {
+          throw new GobblinHelixUnexpectedStateException("Received null 
jobConfig from Helix. We should not see any null configs when reading all 
helixJobs. helixJob=%s", helixJob);
+        }
+        Iterator<TaskConfig> taskConfigIterator = 
jobConfig.getTaskConfigMap().values().iterator();
         if (taskConfigIterator.hasNext()) {
           TaskConfig taskConfig = taskConfigIterator.next();
           String jobName = 
taskConfig.getConfigMap().get(ConfigurationKeys.JOB_NAME_KEY);
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java
index 498e2530a..e21c7e738 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java
@@ -20,18 +20,24 @@ package org.apache.gobblin.cluster;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.nio.file.Files;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.assertj.core.util.Lists;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -52,16 +58,20 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
 import org.apache.gobblin.scheduler.SchedulerService;
 
+import static org.mockito.Mockito.when;
+
 
 /**
  * Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
  *
+ * In all test cases, we use GobblinHelixManagerFactory instead of
+ * HelixManagerFactory, and instantiate a local HelixManager per test to
+ * provide isolation and prevent errors caused by the ZKClient being shared
+ * (e.g. ZKClient is not connected exceptions).
  */
-@Test(groups = {"gobblin.cluster"})
+@Test(groups = {"gobblin.cluster"}, singleThreaded = true)
 public class GobblinHelixJobSchedulerTest {
   public final static Logger LOG = 
LoggerFactory.getLogger(GobblinHelixJobSchedulerTest.class);
-
-  private HelixManager helixManager;
   private FileSystem localFs;
   private Path appWorkDir;
   private final Closer closer = Closer.create();
@@ -70,10 +80,17 @@ public class GobblinHelixJobSchedulerTest {
   private GobblinTaskRunner gobblinTaskRunner;
 
   private Thread thread;
-
   private final String workflowIdSuffix1 = "_1504201348471";
   private final String workflowIdSuffix2 = "_1504201348472";
 
+  private final Instant beginTime = Instant.EPOCH;
+  private final Duration withinThrottlePeriod = Duration.of(1, 
ChronoUnit.SECONDS);
+  private final Duration exceedsThrottlePeriod = Duration.of(
+      
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY
 + 1, ChronoUnit.SECONDS);
+
+  private String zkConnectingString;
+  private String helixClusterName;
+
   @BeforeClass
   public void setUp()
       throws Exception {
@@ -96,17 +113,11 @@ public class GobblinHelixJobSchedulerTest {
             ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath()))
         .withValue(ConfigurationKeys.JOB_STATE_IN_STATE_STORE, 
ConfigValueFactory.fromAnyRef("true")).resolve();
 
-    String zkConnectingString = 
baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
-    String helixClusterName = 
baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    this.zkConnectingString = 
baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    this.helixClusterName = 
baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
 
     HelixUtils.createGobblinHelixCluster(zkConnectingString, helixClusterName);
 
-    this.helixManager = HelixManagerFactory
-        .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
-            zkConnectingString);
-    this.closer.register(() -> helixManager.disconnect());
-    this.helixManager.connect();
-
     this.localFs = FileSystem.getLocal(new Configuration());
 
     this.closer.register(() -> {
@@ -129,58 +140,192 @@ public class GobblinHelixJobSchedulerTest {
     this.thread.start();
   }
 
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is 
enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is 
enabled
+   * Job will not be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowShortPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+        true, true);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within same workflow, throttle is not 
enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateSameWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateSameWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within same workflow, throttle is 
not enabled
+   * Job will be updated
+   * @throws Exception
+   */
   @Test
-  public void testNewJobAndUpdate()
+  public void testUpdateSameWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateSameWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, true);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle 
is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  public void testUpdateDiffWorkflowShortPeriodThrottle()
       throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span is within throttle timeout, within different workflow, throttle 
is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(withinThrottlePeriod, 
"UpdateDiffWorkflowShortPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle 
is enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        true, false);
+  }
+
+  /***
+   * Time span exceeds throttle timeout, within different workflow, throttle 
is not enabled
+   * Job will be updated
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+      throws Exception {
+    runWorkflowTest(exceedsThrottlePeriod, 
"UpdateDiffWorkflowLongPeriodNoThrottle",
+        workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+        false, false);
+  }
+
+  private GobblinHelixJobScheduler createJobScheduler(HelixManager 
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+    java.nio.file.Path p = 
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-        ConfigValueFactory.fromAnyRef("/tmp/" + 
GobblinHelixJobScheduler.class.getSimpleName()));
+        ConfigValueFactory.fromAnyRef(p.toString()));
     SchedulerService schedulerService = new SchedulerService(new Properties());
     NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
     jobCatalog.startAsync();
-    GobblinHelixJobScheduler jobScheduler =
-        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, 
java.util.Optional.empty(),
-            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
-
-    final Properties properties1 =
-        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, 
"1", workflowIdSuffix1);
-    
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
+    Config helixJobSchedulerConfig = 
ConfigFactory.empty().withValue(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        ConfigValueFactory.fromAnyRef(isThrottleEnabled));
+    GobblinHelixJobScheduler gobblinHelixJobScheduler = new 
GobblinHelixJobScheduler(helixJobSchedulerConfig, helixManager, 
java.util.Optional.empty(),
+          new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog, clock);
+    return gobblinHelixJobScheduler;
+  }
 
+  private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties 
properties) {
+    
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
     NewJobConfigArrivalEvent newJobConfigArrivalEvent =
-        new 
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1);
-    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
-    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
-        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
workflowIdSuffix2);
-    Map<String, String> workflowIdMap;
-    this.helixManager.connect();
+        new 
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties);
+    return newJobConfigArrivalEvent;
+  }
+
+  private void connectAndAssertWorkflowId(String expectedSuffix, String 
jobName, HelixManager helixManager) throws Exception {
+    helixManager.connect();
+    String workFlowId = getWorkflowID(jobName, helixManager);
+    Assert.assertNotNull(workFlowId);
+    Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+  }
 
-    String workFlowId = null;
+  private String getWorkflowID (String jobName, HelixManager helixManager)
+      throws Exception {
+    // Poll helix for up to 30 seconds to fetch until a workflow with a 
matching job name exists in Helix and then return that workflowID
     long endTime = System.currentTimeMillis() + 30000;
+    Map<String, String> workflowIdMap;
     while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
+      try{
+        workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
+            Collections.singletonList(jobName));
+      } catch(GobblinHelixUnexpectedStateException e){
+        continue;
+      }
+      if (workflowIdMap.containsKey(jobName)) {
+        return workflowIdMap.get(jobName);
       }
       Thread.sleep(100);
     }
-    Assert.assertNotNull(workFlowId);
-    Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+    return null;
+  }
 
-    jobScheduler.handleUpdateJobConfigArrival(
-        new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
-    this.helixManager.connect();
-    endTime = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < endTime) {
-      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
-          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
-        break;
-      }
-      Thread.sleep(100);
+  private void runWorkflowTest(Duration mockStepAmountTime, String jobSuffix,
+    String newJobWorkflowIdSuffix, String updateWorkflowIdSuffix,
+    String assertUpdateWorkflowIdSuffix, boolean isThrottleEnabled, boolean 
isSameWorkflow) throws Exception {
+    Clock mockClock = Mockito.mock(Clock.class);
+    AtomicReference<Instant> nextInstant = new AtomicReference<>(beginTime);
+    when(mockClock.instant()).thenAnswer(invocation -> 
nextInstant.getAndAccumulate(null, (currentInstant, x) -> 
currentInstant.plus(mockStepAmountTime)));
+
+    // Use GobblinHelixManagerFactory instead of HelixManagerFactory to avoid 
the connection error
+    // helixManager is set to local variable to avoid the HelixManager 
(ZkClient) is not connected error across tests
+    HelixManager helixManager = GobblinHelixManagerFactory
+        .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+            zkConnectingString);
+    GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager, 
isThrottleEnabled, mockClock);
+    final Properties properties = 
GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, jobSuffix, 
newJobWorkflowIdSuffix);
+    NewJobConfigArrivalEvent newJobConfigArrivalEvent = 
createJobConfigArrivalEvent(properties);
+    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+    connectAndAssertWorkflowId(newJobWorkflowIdSuffix, 
newJobConfigArrivalEvent.getJobName(), helixManager);
+
+    if (isSameWorkflow) {
+      properties.setProperty(ConfigurationKeys.JOB_ID_KEY,
+          "job_" + properties.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
updateWorkflowIdSuffix);
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties));
+      connectAndAssertWorkflowId(assertUpdateWorkflowIdSuffix, 
newJobConfigArrivalEvent.getJobName(), helixManager);
+    }
+    else {
+      final Properties properties2 =
+          GobblinHelixJobLauncherTest.generateJobProperties(
+              this.baseConfig, jobSuffix + '2', updateWorkflowIdSuffix);
+      NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+          new 
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2);
+      jobScheduler.handleUpdateJobConfigArrival(
+          new 
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties2));
+      connectAndAssertWorkflowId(assertUpdateWorkflowIdSuffix, 
newJobConfigArrivalEvent2.getJobName(), helixManager);
     }
-    Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix2));
   }
 
   @AfterClass


Reply via email to