This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 44ed806c3 [GOBBLIN-1813] Helix workflows submission timeouts are 
configurable (#3677)
44ed806c3 is described below

commit 44ed806c365fe6750ce7d46f4072ce68130ab93c
Author: Matthew Ho <[email protected]>
AuthorDate: Thu Apr 13 10:51:05 2023 -0700

    [GOBBLIN-1813] Helix workflows submission timeouts are configurable (#3677)
    
    * [GOBBLIN-1813] Helix workflows submission timeouts are configurable
    
    * Use duration and add more detail to the todo
---
 .../cluster/GobblinClusterConfigurationKeys.java   |  5 +-
 ...GobblinHelixDistributeJobExecutionLauncher.java |  9 +++-
 .../gobblin/cluster/GobblinHelixJobLauncher.java   | 42 ++++++++++++++--
 .../cluster/HelixRetriggeringJobCallable.java      | 18 ++++---
 .../org/apache/gobblin/cluster/HelixUtils.java     | 57 ++++++++++------------
 .../cluster/GobblinHelixJobLauncherTest.java       | 13 +++++
 6 files changed, 100 insertions(+), 44 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index d2a60781d..31a8547aa 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -158,6 +158,9 @@ public class GobblinClusterConfigurationKeys {
   public static final String HELIX_TASK_TIMEOUT_SECONDS = 
"helix.task.timeout.seconds";
   public static final String HELIX_TASK_MAX_ATTEMPTS_KEY = 
"helix.task.maxAttempts";
 
+  public static final String HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS = 
GOBBLIN_CLUSTER_PREFIX + "workflowSubmissionTimeoutSeconds";
+  public static final long DEFAULT_HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS = 
300;
+
   public static final String HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS = 
GOBBLIN_CLUSTER_PREFIX + "workflowDeleteTimeoutSeconds";
   public static final long DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS = 300;
 
@@ -219,4 +222,4 @@ public class GobblinClusterConfigurationKeys {
   public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX + 
"containerId";
 
   public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX = 
GOBBLIN_CLUSTER_PREFIX + "sysProps";
-}
\ No newline at end of file
+}
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index f2ba083c5..6faaebd63 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.cluster;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -96,6 +97,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
 
   private final long helixJobStopTimeoutSeconds;
 
+  private final long helixWorkflowSubmissionTimeoutSeconds;
+
   private boolean jobSubmitted;
 
   // A conditional variable for which the condition is satisfied if a 
cancellation is requested
@@ -135,6 +138,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
     this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(this.combinedConfigs,
         GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
         
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS);
+    this.helixWorkflowSubmissionTimeoutSeconds = 
ConfigUtils.getLong(this.combinedConfigs,
+        
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS,
+        
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS);
   }
 
   @Override
@@ -252,7 +258,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
         jobId,
         taskDriver,
         this.planningJobHelixManager,
-        this.workFlowExpiryTimeSeconds);
+        Duration.ofSeconds(this.workFlowExpiryTimeSeconds),
+        Duration.ofSeconds(this.helixWorkflowSubmissionTimeoutSeconds));
     this.jobSubmitted = true;
   }
 
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 6308ddc2d..c0578a997 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.cluster;
 
 import java.io.IOException;
 import java.net.URI;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -44,6 +45,7 @@ import com.github.rholder.retry.RetryException;
 import com.github.rholder.retry.Retryer;
 import com.github.rholder.retry.RetryerBuilder;
 import com.github.rholder.retry.StopStrategies;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
@@ -131,6 +133,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
   private final Config jobConfig;
   private final long workFlowExpiryTimeSeconds;
   private final long helixJobStopTimeoutSeconds;
+  private final long helixWorkflowSubmissionTimeoutSeconds;
   private Map<String, TaskConfig> workUnitToHelixConfig;
   private Retryer<Boolean> taskRetryer;
 
@@ -163,6 +166,10 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
         ConfigUtils.getLong(jobConfig, 
GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
             
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS);
 
+    this.helixWorkflowSubmissionTimeoutSeconds = ConfigUtils.getLong(jobConfig,
+        
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS,
+        
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS);
+
     Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps)
         .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, 
ConfigValueFactory.fromAnyRef(
             new URI(appWorkDir.toUri().getScheme(), null, 
appWorkDir.toUri().getHost(), appWorkDir.toUri().getPort(),
@@ -434,7 +441,9 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
    */
   private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws 
Exception {
     HelixUtils.submitJobToWorkFlow(jobConfigBuilder, this.helixWorkFlowName, 
this.jobContext.getJobId(),
-        this.helixTaskDriver, this.helixManager, 
this.workFlowExpiryTimeSeconds);
+        this.helixTaskDriver, this.helixManager,
+        Duration.ofSeconds(this.workFlowExpiryTimeSeconds),
+        Duration.ofSeconds(this.helixWorkflowSubmissionTimeoutSeconds));
   }
 
   public void launchJob(@Nullable JobListener jobListener) throws JobException 
{
@@ -447,11 +456,13 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
       if (this.runningMap.replace(this.jobContext.getJobName(), false, true)) {
         LOGGER.info("Job {} will be executed, add into running map.", 
this.jobContext.getJobId());
         isLaunched = true;
-        super.launchJob(jobListener);
+        launchJobImpl(jobListener);
       } else {
         LOGGER.warn("Job {} will not be executed because other jobs are still 
running.", this.jobContext.getJobId());
       }
-      // TODO: Better error handling
+
+      // TODO: Better error handling. The current impl swallows exceptions for 
jobs that were started by this method call.
+      // One potential way to improve the error handling is to make this error 
swallowing conifgurable
     } catch (Throwable t) {
       errorInJobLaunching = t;
     } finally {
@@ -467,6 +478,29 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     }
   }
 
+
+  /**
+   * This method looks silly at first glance but exists for a reason.
+   *
+   * The method {@link GobblinHelixJobLauncher#launchJob(JobListener)} 
contains boiler plate for handling exceptions and
+   * mutating the runningMap to communicate state back to the {@link 
GobblinHelixJobScheduler}. The boiler plate swallows
+   * exceptions when launching the job because many use cases require that 1 
job failure should not affect other jobs by causing the
+   * entire process to fail through an uncaught exception.
+   *
+   * This method is useful for unit testing edge cases where we expect {@link 
JobException}s during the underlying launch operation.
+   * It would be nice to not swallow exceptions, but the implications of doing 
that will require careful refactoring since
+   * the class {@link GobblinHelixJobLauncher} and {@link 
GobblinHelixJobScheduler} are shared for 2 quite different cases
+   * between GaaS and streaming. GaaS typically requiring many short lifetime 
workflows (where a failure is tolerated) and
+   * streaming requiring a small number of long running workflows (where 
failure to submit is unexpected and is not
+   * tolerated)
+   *
+   * @throws JobException
+   */
+  @VisibleForTesting
+  void launchJobImpl(@Nullable JobListener jobListener) throws JobException {
+    super.launchJob(jobListener);
+  }
+
   private TaskConfig getTaskConfig(WorkUnit workUnit, ParallelRunner 
stateSerDeRunner) throws IOException {
     String workUnitFilePath =
         persistWorkUnit(new Path(this.inputWorkUnitDir, 
this.jobContext.getJobId()), workUnit, stateSerDeRunner);
@@ -585,4 +619,4 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
       this.fs.delete(jobStateFilePath, false);
     }
   }
-}
\ No newline at end of file
+}
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 37da2c0bf..2dfb4b773 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -17,26 +17,23 @@
 
 package org.apache.gobblin.cluster;
 
-import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.locks.Lock;
 
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
 import com.google.common.util.concurrent.Striped;
 import com.typesafe.config.Config;
@@ -45,6 +42,10 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.api.JobExecutionMonitor;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
@@ -326,7 +327,10 @@ class HelixRetriggeringJobCallable implements Callable {
         // make sure the planning job is initialized (or visible) to other 
parallel running threads,
         // so that the same critical section check (querying Helix for job 
completeness)
         // can be applied.
-        HelixUtils.waitJobInitialization(planningJobHelixManager, 
newPlanningId, newPlanningId);
+        Duration submissionTimeout = Duration.ofSeconds(PropertiesUtils
+            .getPropAsLong(sysProps, 
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS,
+                
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS));
+        HelixUtils.waitJobInitialization(planningJobHelixManager, 
newPlanningId, newPlanningId, submissionTimeout);
       } finally {
         planningJobHelixManager.disconnect();
         // end of the critical section to check if a job with same job name is 
running
@@ -414,4 +418,4 @@ class HelixRetriggeringJobCallable implements Callable {
 
     
this.jobScheduler.jobSchedulerMetrics.numCancellationComplete.incrementAndGet();
   }
-}
\ No newline at end of file
+}
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 308229b34..54e5a8108 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -17,7 +17,7 @@
 
 package org.apache.gobblin.cluster;
 
-import com.google.common.collect.Lists;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -28,15 +28,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.runtime.JobException;
-import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.runtime.listeners.JobListener;
-import org.apache.gobblin.util.Id;
-import org.apache.gobblin.util.JobLauncherUtils;
+
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
@@ -57,7 +49,20 @@ import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
 
-import static org.apache.helix.task.TaskState.*;
+import com.google.common.collect.Lists;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobException;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.util.Id;
+import org.apache.gobblin.util.JobLauncherUtils;
+
+import static org.apache.helix.task.TaskState.STOPPED;
 
 
 /**
@@ -112,22 +117,11 @@ public class HelixUtils {
     return namePrefix + "_" + instanceId;
   }
 
-  // We have switched from Helix JobQueue to WorkFlow based job execution.
-  @Deprecated
-  public static void submitJobToQueue(
-      JobConfig.Builder jobConfigBuilder,
-      String queueName,
-      String jobName,
-      TaskDriver helixTaskDriver,
-      HelixManager helixManager,
-      long jobQueueDeleteTimeoutSeconds) throws Exception {
-    submitJobToWorkFlow(jobConfigBuilder, queueName, jobName, helixTaskDriver, 
helixManager, jobQueueDeleteTimeoutSeconds);
-  }
-
   static void waitJobInitialization(
       HelixManager helixManager,
       String workFlowName,
-      String jobName) throws Exception {
+      String jobName,
+      Duration timeout) throws Exception {
     WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(helixManager, workFlowName);
 
     // If the helix job is deleted from some other thread or a completely 
external process,
@@ -136,11 +130,11 @@ public class HelixUtils {
     // 2) it did get initialized but deleted soon after, in which case we 
should stop waiting
     // To overcome this issue, we wait here till workflowContext gets 
initialized
     long start = System.currentTimeMillis();
-    long timeoutMillis = TimeUnit.MINUTES.toMillis(5L);
     while (workflowContext == null || 
workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, 
jobName)) == null) {
-      if (System.currentTimeMillis() - start > timeoutMillis) {
-        log.error("Job cannot be initialized within {} milliseconds, 
considered as an error", timeoutMillis);
-        throw new JobException("Job cannot be initialized within {} 
milliseconds, considered as an error");
+      if (System.currentTimeMillis() - start > timeout.toMillis()) {
+        log.error("Job cannot be initialized within {} milliseconds, 
considered as an error", timeout.toMillis());
+        throw new JobException(String.format("Job cannot be initialized within 
%s milliseconds, considered as an error",
+            timeout.toMillis()));
       }
       workflowContext = TaskDriver.getWorkflowContext(helixManager, 
workFlowName);
       Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
@@ -235,16 +229,17 @@ public class HelixUtils {
       String jobName,
       TaskDriver helixTaskDriver,
       HelixManager helixManager,
-      long workFlowExpiryTime) throws Exception {
+      Duration workFlowExpiryTime,
+      Duration submissionTimeout) throws Exception {
 
-    WorkflowConfig workFlowConfig = new 
WorkflowConfig.Builder().setExpiry(workFlowExpiryTime, 
TimeUnit.SECONDS).build();
+    WorkflowConfig workFlowConfig = new 
WorkflowConfig.Builder().setExpiry(workFlowExpiryTime.getSeconds(), 
TimeUnit.SECONDS).build();
     // Create a work flow for each job with the name being the queue name
     Workflow workFlow = new 
Workflow.Builder(workFlowName).setWorkflowConfig(workFlowConfig).addJob(jobName,
 jobConfigBuilder).build();
     // start the workflow
     helixTaskDriver.start(workFlow);
     log.info("Created a work flow {}", workFlowName);
 
-    waitJobInitialization(helixManager, workFlowName, jobName);
+    waitJobInitialization(helixManager, workFlowName, jobName, 
submissionTimeout);
   }
 
   static void waitJobCompletion(HelixManager helixManager, String 
workFlowName, String jobName,
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index e171591d5..975dad38b 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -286,6 +286,19 @@ public class GobblinHelixJobLauncherTest {
     Assert.assertEquals(testListener.getCompletes().get() == 1, true);
   }
 
+  public void testTimeout() throws Exception {
+    final ConcurrentHashMap<String, Boolean> runningMap = new 
ConcurrentHashMap<>();
+
+    final Properties props = generateJobProperties(this.baseConfig, 
"testTimeoutTest", "_12345");
+    
props.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS,
 "0");
+    final GobblinHelixJobLauncher gobblinHelixJobLauncher = 
this.closer.register(
+        new GobblinHelixJobLauncher(props, this.helixManager, this.appWorkDir, 
ImmutableList.<Tag<?>>of(), runningMap,
+            java.util.Optional.empty()));
+
+    // using launchJobImpl because we do not want to swallow the exception
+    Assert.assertThrows(JobException.class, () -> 
gobblinHelixJobLauncher.launchJobImpl(null));
+  }
+
   @Test(enabled = false, dependsOnMethods = {"testLaunchJob", 
"testLaunchMultipleJobs"})
   public void testJobCleanup() throws Exception {
     final ConcurrentHashMap<String, Boolean> runningMap = new 
ConcurrentHashMap<>();

Reply via email to