This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 44ed806c3 [GOBBLIN-1813] Helix workflows submission timeouts are
configurable (#3677)
44ed806c3 is described below
commit 44ed806c365fe6750ce7d46f4072ce68130ab93c
Author: Matthew Ho <[email protected]>
AuthorDate: Thu Apr 13 10:51:05 2023 -0700
[GOBBLIN-1813] Helix workflows submission timeouts are configurable (#3677)
* [GOBBLIN-1813] Helix workflows submission timeouts are configurable
* Use duration and add more detail to the todo
---
.../cluster/GobblinClusterConfigurationKeys.java | 5 +-
...GobblinHelixDistributeJobExecutionLauncher.java | 9 +++-
.../gobblin/cluster/GobblinHelixJobLauncher.java | 42 ++++++++++++++--
.../cluster/HelixRetriggeringJobCallable.java | 18 ++++---
.../org/apache/gobblin/cluster/HelixUtils.java | 57 ++++++++++------------
.../cluster/GobblinHelixJobLauncherTest.java | 13 +++++
6 files changed, 100 insertions(+), 44 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index d2a60781d..31a8547aa 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -158,6 +158,9 @@ public class GobblinClusterConfigurationKeys {
public static final String HELIX_TASK_TIMEOUT_SECONDS =
"helix.task.timeout.seconds";
public static final String HELIX_TASK_MAX_ATTEMPTS_KEY =
"helix.task.maxAttempts";
+ public static final String HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS =
GOBBLIN_CLUSTER_PREFIX + "workflowSubmissionTimeoutSeconds";
+ public static final long DEFAULT_HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS =
300;
+
public static final String HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS =
GOBBLIN_CLUSTER_PREFIX + "workflowDeleteTimeoutSeconds";
public static final long DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS = 300;
@@ -219,4 +222,4 @@ public class GobblinClusterConfigurationKeys {
public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX +
"containerId";
public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX =
GOBBLIN_CLUSTER_PREFIX + "sysProps";
-}
\ No newline at end of file
+}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index f2ba083c5..6faaebd63 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.cluster;
import java.io.Closeable;
import java.io.IOException;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -96,6 +97,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements
JobExecutionLauncher
private final long helixJobStopTimeoutSeconds;
+ private final long helixWorkflowSubmissionTimeoutSeconds;
+
private boolean jobSubmitted;
// A conditional variable for which the condition is satisfied if a
cancellation is requested
@@ -135,6 +138,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements
JobExecutionLauncher
this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(this.combinedConfigs,
GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS);
+ this.helixWorkflowSubmissionTimeoutSeconds =
ConfigUtils.getLong(this.combinedConfigs,
+
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS,
+
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS);
}
@Override
@@ -252,7 +258,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements
JobExecutionLauncher
jobId,
taskDriver,
this.planningJobHelixManager,
- this.workFlowExpiryTimeSeconds);
+ Duration.ofSeconds(this.workFlowExpiryTimeSeconds),
+ Duration.ofSeconds(this.helixWorkflowSubmissionTimeoutSeconds));
this.jobSubmitted = true;
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 6308ddc2d..c0578a997 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.cluster;
import java.io.IOException;
import java.net.URI;
+import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -44,6 +45,7 @@ import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
@@ -131,6 +133,7 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
private final Config jobConfig;
private final long workFlowExpiryTimeSeconds;
private final long helixJobStopTimeoutSeconds;
+ private final long helixWorkflowSubmissionTimeoutSeconds;
private Map<String, TaskConfig> workUnitToHelixConfig;
private Retryer<Boolean> taskRetryer;
@@ -163,6 +166,10 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
ConfigUtils.getLong(jobConfig,
GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS);
+ this.helixWorkflowSubmissionTimeoutSeconds = ConfigUtils.getLong(jobConfig,
+
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS,
+
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS);
+
Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps)
.withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY,
ConfigValueFactory.fromAnyRef(
new URI(appWorkDir.toUri().getScheme(), null,
appWorkDir.toUri().getHost(), appWorkDir.toUri().getPort(),
@@ -434,7 +441,9 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
*/
private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws
Exception {
HelixUtils.submitJobToWorkFlow(jobConfigBuilder, this.helixWorkFlowName,
this.jobContext.getJobId(),
- this.helixTaskDriver, this.helixManager,
this.workFlowExpiryTimeSeconds);
+ this.helixTaskDriver, this.helixManager,
+ Duration.ofSeconds(this.workFlowExpiryTimeSeconds),
+ Duration.ofSeconds(this.helixWorkflowSubmissionTimeoutSeconds));
}
public void launchJob(@Nullable JobListener jobListener) throws JobException
{
@@ -447,11 +456,13 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
if (this.runningMap.replace(this.jobContext.getJobName(), false, true)) {
LOGGER.info("Job {} will be executed, add into running map.",
this.jobContext.getJobId());
isLaunched = true;
- super.launchJob(jobListener);
+ launchJobImpl(jobListener);
} else {
LOGGER.warn("Job {} will not be executed because other jobs are still
running.", this.jobContext.getJobId());
}
- // TODO: Better error handling
+
+ // TODO: Better error handling. The current impl swallows exceptions for
jobs that were started by this method call.
+ // One potential way to improve the error handling is to make this error
swallowing conifgurable
} catch (Throwable t) {
errorInJobLaunching = t;
} finally {
@@ -467,6 +478,29 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
}
}
+
+ /**
+ * This method looks silly at first glance but exists for a reason.
+ *
+ * The method {@link GobblinHelixJobLauncher#launchJob(JobListener)}
contains boiler plate for handling exceptions and
+ * mutating the runningMap to communicate state back to the {@link
GobblinHelixJobScheduler}. The boiler plate swallows
+ * exceptions when launching the job because many use cases require that 1
job failure should not affect other jobs by causing the
+ * entire process to fail through an uncaught exception.
+ *
+ * This method is useful for unit testing edge cases where we expect {@link
JobException}s during the underlying launch operation.
+ * It would be nice to not swallow exceptions, but the implications of doing
that will require careful refactoring since
+ * the class {@link GobblinHelixJobLauncher} and {@link
GobblinHelixJobScheduler} are shared for 2 quite different cases
+ * between GaaS and streaming. GaaS typically requiring many short lifetime
workflows (where a failure is tolerated) and
+ * streaming requiring a small number of long running workflows (where
failure to submit is unexpected and is not
+ * tolerated)
+ *
+ * @throws JobException
+ */
+ @VisibleForTesting
+ void launchJobImpl(@Nullable JobListener jobListener) throws JobException {
+ super.launchJob(jobListener);
+ }
+
private TaskConfig getTaskConfig(WorkUnit workUnit, ParallelRunner
stateSerDeRunner) throws IOException {
String workUnitFilePath =
persistWorkUnit(new Path(this.inputWorkUnitDir,
this.jobContext.getJobId()), workUnit, stateSerDeRunner);
@@ -585,4 +619,4 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
this.fs.delete(jobStateFilePath, false);
}
}
-}
\ No newline at end of file
+}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 37da2c0bf..2dfb4b773 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -17,26 +17,23 @@
package org.apache.gobblin.cluster;
-import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Striped;
import com.typesafe.config.Config;
@@ -45,6 +42,10 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.api.JobExecutionMonitor;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
@@ -326,7 +327,10 @@ class HelixRetriggeringJobCallable implements Callable {
// make sure the planning job is initialized (or visible) to other
parallel running threads,
// so that the same critical section check (querying Helix for job
completeness)
// can be applied.
- HelixUtils.waitJobInitialization(planningJobHelixManager,
newPlanningId, newPlanningId);
+ Duration submissionTimeout = Duration.ofSeconds(PropertiesUtils
+ .getPropAsLong(sysProps,
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS,
+
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS));
+ HelixUtils.waitJobInitialization(planningJobHelixManager,
newPlanningId, newPlanningId, submissionTimeout);
} finally {
planningJobHelixManager.disconnect();
// end of the critical section to check if a job with same job name is
running
@@ -414,4 +418,4 @@ class HelixRetriggeringJobCallable implements Callable {
this.jobScheduler.jobSchedulerMetrics.numCancellationComplete.incrementAndGet();
}
-}
\ No newline at end of file
+}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 308229b34..54e5a8108 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -17,7 +17,7 @@
package org.apache.gobblin.cluster;
-import com.google.common.collect.Lists;
+import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -28,15 +28,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.runtime.JobException;
-import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.runtime.listeners.JobListener;
-import org.apache.gobblin.util.Id;
-import org.apache.gobblin.util.JobLauncherUtils;
+
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
@@ -57,7 +49,20 @@ import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.tools.ClusterSetup;
-import static org.apache.helix.task.TaskState.*;
+import com.google.common.collect.Lists;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobException;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.util.Id;
+import org.apache.gobblin.util.JobLauncherUtils;
+
+import static org.apache.helix.task.TaskState.STOPPED;
/**
@@ -112,22 +117,11 @@ public class HelixUtils {
return namePrefix + "_" + instanceId;
}
- // We have switched from Helix JobQueue to WorkFlow based job execution.
- @Deprecated
- public static void submitJobToQueue(
- JobConfig.Builder jobConfigBuilder,
- String queueName,
- String jobName,
- TaskDriver helixTaskDriver,
- HelixManager helixManager,
- long jobQueueDeleteTimeoutSeconds) throws Exception {
- submitJobToWorkFlow(jobConfigBuilder, queueName, jobName, helixTaskDriver,
helixManager, jobQueueDeleteTimeoutSeconds);
- }
-
static void waitJobInitialization(
HelixManager helixManager,
String workFlowName,
- String jobName) throws Exception {
+ String jobName,
+ Duration timeout) throws Exception {
WorkflowContext workflowContext =
TaskDriver.getWorkflowContext(helixManager, workFlowName);
// If the helix job is deleted from some other thread or a completely
external process,
@@ -136,11 +130,11 @@ public class HelixUtils {
// 2) it did get initialized but deleted soon after, in which case we
should stop waiting
// To overcome this issue, we wait here till workflowContext gets
initialized
long start = System.currentTimeMillis();
- long timeoutMillis = TimeUnit.MINUTES.toMillis(5L);
while (workflowContext == null ||
workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName,
jobName)) == null) {
- if (System.currentTimeMillis() - start > timeoutMillis) {
- log.error("Job cannot be initialized within {} milliseconds,
considered as an error", timeoutMillis);
- throw new JobException("Job cannot be initialized within {}
milliseconds, considered as an error");
+ if (System.currentTimeMillis() - start > timeout.toMillis()) {
+ log.error("Job cannot be initialized within {} milliseconds,
considered as an error", timeout.toMillis());
+ throw new JobException(String.format("Job cannot be initialized within
%s milliseconds, considered as an error",
+ timeout.toMillis()));
}
workflowContext = TaskDriver.getWorkflowContext(helixManager,
workFlowName);
Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
@@ -235,16 +229,17 @@ public class HelixUtils {
String jobName,
TaskDriver helixTaskDriver,
HelixManager helixManager,
- long workFlowExpiryTime) throws Exception {
+ Duration workFlowExpiryTime,
+ Duration submissionTimeout) throws Exception {
- WorkflowConfig workFlowConfig = new
WorkflowConfig.Builder().setExpiry(workFlowExpiryTime,
TimeUnit.SECONDS).build();
+ WorkflowConfig workFlowConfig = new
WorkflowConfig.Builder().setExpiry(workFlowExpiryTime.getSeconds(),
TimeUnit.SECONDS).build();
// Create a work flow for each job with the name being the queue name
Workflow workFlow = new
Workflow.Builder(workFlowName).setWorkflowConfig(workFlowConfig).addJob(jobName,
jobConfigBuilder).build();
// start the workflow
helixTaskDriver.start(workFlow);
log.info("Created a work flow {}", workFlowName);
- waitJobInitialization(helixManager, workFlowName, jobName);
+ waitJobInitialization(helixManager, workFlowName, jobName,
submissionTimeout);
}
static void waitJobCompletion(HelixManager helixManager, String
workFlowName, String jobName,
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index e171591d5..975dad38b 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -286,6 +286,19 @@ public class GobblinHelixJobLauncherTest {
Assert.assertEquals(testListener.getCompletes().get() == 1, true);
}
+ public void testTimeout() throws Exception {
+ final ConcurrentHashMap<String, Boolean> runningMap = new
ConcurrentHashMap<>();
+
+ final Properties props = generateJobProperties(this.baseConfig,
"testTimeoutTest", "_12345");
+
props.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS,
"0");
+ final GobblinHelixJobLauncher gobblinHelixJobLauncher =
this.closer.register(
+ new GobblinHelixJobLauncher(props, this.helixManager, this.appWorkDir,
ImmutableList.<Tag<?>>of(), runningMap,
+ java.util.Optional.empty()));
+
+ // using launchJobImpl because we do not want to swallow the exception
+ Assert.assertThrows(JobException.class, () ->
gobblinHelixJobLauncher.launchJobImpl(null));
+ }
+
@Test(enabled = false, dependsOnMethods = {"testLaunchJob",
"testLaunchMultipleJobs"})
public void testJobCleanup() throws Exception {
final ConcurrentHashMap<String, Boolean> runningMap = new
ConcurrentHashMap<>();