This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b6b49c714 [GOBBLIN-1847] Exceptions in the JobLauncher should try to
delete the existing workflow if it is launched (#3711)
b6b49c714 is described below
commit b6b49c7144a0f7663ae4e80b336bf44f2de07253
Author: Matthew Ho <[email protected]>
AuthorDate: Wed Jun 28 10:51:14 2023 -0700
[GOBBLIN-1847] Exceptions in the JobLauncher should try to delete the
existing workflow if it is launched (#3711)
* [GOBBLIN-1847] Exceptions in the JobLauncher should try to delete the
existing workflow if it is launched
* Only cancel helix workflow if failure occurs during startup
---
.../gobblin/cluster/GobblinHelixJobLauncher.java | 6 ++++-
.../cluster/GobblinHelixJobLauncherTest.java | 31 ++++++++++++++++++++++
2 files changed, 36 insertions(+), 1 deletion(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index c0578a997..cf324b443 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -462,9 +462,13 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
}
// TODO: Better error handling. The current impl swallows exceptions for
jobs that were started by this method call.
- // One potential way to improve the error handling is to make this error
swallowing conifgurable
+ // One potential way to improve the error handling is to make this error
swallowing configurable
} catch (Throwable t) {
errorInJobLaunching = t;
+ if (isLaunched) {
+ // Attempts to cancel the helix workflow if an error occurs during
launch
+ cancelJob(jobListener);
+ }
} finally {
if (isLaunched) {
if (this.runningMap.replace(this.jobContext.getJobName(), true,
false)) {
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index 975dad38b..d475688e5 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -38,6 +38,7 @@ import org.apache.helix.InstanceType;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -63,6 +64,7 @@ import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.listeners.AbstractJobListener;
+import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
@@ -299,6 +301,35 @@ public class GobblinHelixJobLauncherTest {
Assert.assertThrows(JobException.class, () ->
gobblinHelixJobLauncher.launchJobImpl(null));
}
+ public void testCancelJobOnFailureDuringLaunch() throws Exception {
+ final ConcurrentHashMap<String, Boolean> runningMap = new
ConcurrentHashMap<>();
+ final Properties props = generateJobProperties(this.baseConfig,
"testDoesCancelOnFailure", "_12345");
+
props.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS,
"0");
+
+ final GobblinHelixJobLauncher gobblinHelixJobLauncher =
this.closer.register(
+ new GobblinHelixJobLauncher(props, this.helixManager, this.appWorkDir,
ImmutableList.<Tag<?>>of(), runningMap,
+ java.util.Optional.empty()));
+
+ // The launchJob will throw an exception (see testTimeout test) and we
expect the launcher to swallow the exception,
+ // then call still properly call cancel. We use the listener to confirm
the cancel hook was correctly called once
+ JobListener mockListener = Mockito.mock(JobListener.class);
+ gobblinHelixJobLauncher.launchJob(mockListener);
+
Mockito.verify(mockListener).onJobCancellation(Mockito.any(JobContext.class));
+ }
+
+ public void testNoCancelWhenJobCompletesSuccessfully() throws Exception {
+ final ConcurrentHashMap<String, Boolean> runningMap = new
ConcurrentHashMap<>();
+ final Properties props = generateJobProperties(this.baseConfig,
"testDoesNotCancelOnSuccess", "_12345");
+ final GobblinHelixJobLauncher gobblinHelixJobLauncher =
this.closer.register(
+ new GobblinHelixJobLauncher(props, this.helixManager, this.appWorkDir,
ImmutableList.<Tag<?>>of(), runningMap,
+ java.util.Optional.empty()));
+
+ // When the job finishes successfully, the cancellation hook should not be
invoked
+ JobListener mockListener = Mockito.mock(JobListener.class);
+ gobblinHelixJobLauncher.launchJob(mockListener);
+ Mockito.verify(mockListener,
Mockito.never()).onJobCancellation(Mockito.any(JobContext.class));
+ }
+
@Test(enabled = false, dependsOnMethods = {"testLaunchJob",
"testLaunchMultipleJobs"})
public void testJobCleanup() throws Exception {
final ConcurrentHashMap<String, Boolean> runningMap = new
ConcurrentHashMap<>();