This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b6b49c714 [GOBBLIN-1847] Exceptions in the JobLauncher should try to 
delete the existing workflow if it is launched (#3711)
b6b49c714 is described below

commit b6b49c7144a0f7663ae4e80b336bf44f2de07253
Author: Matthew Ho <[email protected]>
AuthorDate: Wed Jun 28 10:51:14 2023 -0700

    [GOBBLIN-1847] Exceptions in the JobLauncher should try to delete the 
existing workflow if it is launched (#3711)
    
    * [GOBBLIN-1847] Exceptions in the JobLauncher should try to delete the 
existing workflow if it is launched
    
    * Only cancel helix workflow if failure occurs during startup
---
 .../gobblin/cluster/GobblinHelixJobLauncher.java   |  6 ++++-
 .../cluster/GobblinHelixJobLauncherTest.java       | 31 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index c0578a997..cf324b443 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -462,9 +462,13 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
       }
 
       // TODO: Better error handling. The current impl swallows exceptions for 
jobs that were started by this method call.
-      // One potential way to improve the error handling is to make this error 
swallowing conifgurable
+      // One potential way to improve the error handling is to make this error 
swallowing configurable
     } catch (Throwable t) {
       errorInJobLaunching = t;
+      if (isLaunched) {
+        // Attempts to cancel the helix workflow if an error occurs during 
launch
+        cancelJob(jobListener);
+      }
     } finally {
       if (isLaunched) {
         if (this.runningMap.replace(this.jobContext.getJobName(), true, 
false)) {
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index 975dad38b..d475688e5 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -38,6 +38,7 @@ import org.apache.helix.InstanceType;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -63,6 +64,7 @@ import org.apache.gobblin.runtime.JobContext;
 import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.listeners.AbstractJobListener;
+import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 
@@ -299,6 +301,35 @@ public class GobblinHelixJobLauncherTest {
     Assert.assertThrows(JobException.class, () -> 
gobblinHelixJobLauncher.launchJobImpl(null));
   }
 
+  public void testCancelJobOnFailureDuringLaunch() throws Exception {
+    final ConcurrentHashMap<String, Boolean> runningMap = new 
ConcurrentHashMap<>();
+    final Properties props = generateJobProperties(this.baseConfig, 
"testDoesCancelOnFailure", "_12345");
+    
props.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS,
 "0");
+
+    final GobblinHelixJobLauncher gobblinHelixJobLauncher = 
this.closer.register(
+        new GobblinHelixJobLauncher(props, this.helixManager, this.appWorkDir, 
ImmutableList.<Tag<?>>of(), runningMap,
+            java.util.Optional.empty()));
+
+    // The launchJob will throw an exception (see testTimeout test) and we 
expect the launcher to swallow the exception,
+    // then call still properly call cancel. We use the listener to confirm 
the cancel hook was correctly called once
+    JobListener mockListener = Mockito.mock(JobListener.class);
+    gobblinHelixJobLauncher.launchJob(mockListener);
+    
Mockito.verify(mockListener).onJobCancellation(Mockito.any(JobContext.class));
+  }
+
+  public void testNoCancelWhenJobCompletesSuccessfully() throws Exception {
+    final ConcurrentHashMap<String, Boolean> runningMap = new 
ConcurrentHashMap<>();
+    final Properties props = generateJobProperties(this.baseConfig, 
"testDoesNotCancelOnSuccess", "_12345");
+    final GobblinHelixJobLauncher gobblinHelixJobLauncher = 
this.closer.register(
+        new GobblinHelixJobLauncher(props, this.helixManager, this.appWorkDir, 
ImmutableList.<Tag<?>>of(), runningMap,
+            java.util.Optional.empty()));
+
+    // When the job finishes successfully, the cancellation hook should not be 
invoked
+    JobListener mockListener = Mockito.mock(JobListener.class);
+    gobblinHelixJobLauncher.launchJob(mockListener);
+    Mockito.verify(mockListener, 
Mockito.never()).onJobCancellation(Mockito.any(JobContext.class));
+  }
+
   @Test(enabled = false, dependsOnMethods = {"testLaunchJob", 
"testLaunchMultipleJobs"})
   public void testJobCleanup() throws Exception {
     final ConcurrentHashMap<String, Boolean> runningMap = new 
ConcurrentHashMap<>();

Reply via email to