This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 20c434188f [GOBBLIN-2148] Add temporal workflow cancel support (#4045)
20c434188f is described below

commit 20c434188ff0715110d5f708b9e16ed8612059c4
Author: abhishekmjain <[email protected]>
AuthorDate: Mon Oct 7 23:24:39 2024 +0530

    [GOBBLIN-2148] Add temporal workflow cancel support (#4045)
    
    * Add temporal workflow cancel support
---
 .../ddm/launcher/ExecuteGobblinJobLauncher.java    |   4 +-
 .../ddm/launcher/GenerateWorkUnitsJobLauncher.java |   3 +-
 .../ddm/launcher/ProcessWorkUnitsJobLauncher.java  |   3 +-
 .../temporal/joblauncher/GobblinJobLauncher.java   |  25 ++-
 .../joblauncher/GobblinTemporalJobLauncher.java    |  65 ++++++-
 .../joblauncher/GobblinTemporalJobScheduler.java   |   8 +
 .../GobblinTemporalJobLauncherTest.java            | 204 +++++++++++++++++++++
 .../org.mockito.plugins.MockMaker                  |   1 +
 8 files changed, 303 insertions(+), 10 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
index 4c2e44c5bd..b2f65ccb41 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -83,10 +83,12 @@ public class ExecuteGobblinJobLauncher extends 
GobblinTemporalJobLauncher {
   public void submitJob(List<WorkUnit> workunits) {
     try {
       Properties finalProps = adjustJobProperties(this.jobProps);
+      // Initialize workflowId.
+      this.workflowId = 
Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, 
ConfigFactory.parseProperties(finalProps));
       WorkflowOptions options = WorkflowOptions.newBuilder()
           .setTaskQueue(this.queueName)
-          
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, 
ConfigFactory.parseProperties(finalProps)))
           
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(finalProps))
+          .setWorkflowId(this.workflowId)
           .build();
       ExecuteGobblinWorkflow workflow = 
this.client.newWorkflowStub(ExecuteGobblinWorkflow.class, options);
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
index ce0d8b732b..b058472c6b 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
@@ -71,10 +71,11 @@ public class GenerateWorkUnitsJobLauncher extends 
GobblinTemporalJobLauncher {
   @Override
   public void submitJob(List<WorkUnit> workunits) {
     try {
+      this.workflowId = 
Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, 
ConfigFactory.parseProperties(jobProps));
       WorkflowOptions options = WorkflowOptions.newBuilder()
           .setTaskQueue(this.queueName)
           
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps))
-          
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, 
ConfigFactory.parseProperties(jobProps)))
+          .setWorkflowId(this.workflowId)
           .build();
       GenerateWorkUnitsWorkflow workflow = 
this.client.newWorkflowStub(GenerateWorkUnitsWorkflow.class, options);
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index d0ce87c05d..709a9cf935 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -102,10 +102,11 @@ public class ProcessWorkUnitsJobLauncher extends 
GobblinTemporalJobLauncher {
           GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX,
           
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX));
 
+      this.workflowId = 
Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec, 
ConfigFactory.parseProperties(jobProps));
       WorkflowOptions options = WorkflowOptions.newBuilder()
           .setTaskQueue(this.queueName)
           
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps))
-          
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec, 
ConfigFactory.parseProperties(jobProps)))
+          .setWorkflowId(this.workflowId)
           .build();
 
       Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec, 
Help.loadFileSystem(wuSpec)));
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
index 12d8861ecd..ea2c2ce7c2 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
@@ -22,6 +22,10 @@ import java.net.URI;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -87,7 +91,7 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
   protected final StateStores stateStores;
   protected JobListener jobListener;
   protected volatile boolean jobSubmitted = false;
-
+  private final ExecutorService executor;
 
   public GobblinJobLauncher(Properties jobProps, Path appWorkDir,
       List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> 
runningMap, EventBus eventbus)
@@ -122,6 +126,7 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
     this.taskStateCollectorService =
         new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), 
this.eventBus, this.eventSubmitter,
             this.stateStores.getTaskStateStore(), this.outputTaskStateDir, 
this.getIssueRepository());
+    this.executor = Executors.newSingleThreadExecutor();
   }
 
   @Override
@@ -150,17 +155,23 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
       // Start the output TaskState collector service
       this.taskStateCollectorService.startAsync().awaitRunning();
 
+      Future<?> submitJobFuture = null;
       synchronized (this.cancellationRequest) {
         if (!this.cancellationRequested) {
-          submitJob(workUnits);
+          submitJobFuture = executor.submit(() -> {
+            try {
+              submitJob(workUnits);
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
+          });
           log.info(String.format("Submitted job %s", 
this.jobContext.getJobId()));
           this.jobSubmitted = true;
         } else {
           log.warn("Job {} not submitted as it was requested to be 
cancelled.", this.jobContext.getJobId());
         }
       }
-
-      waitJob();
+      waitJob(submitJobFuture);
       log.info(String.format("Job %s completed", this.jobContext.getJobId()));
     } finally {
       // The last iteration of output TaskState collecting will run when the 
collector service gets stopped
@@ -172,7 +183,11 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
   protected void submitJob(List<WorkUnit> workUnits) throws Exception {
   }
 
-  protected void waitJob() throws InterruptedException {
+  protected void waitJob(Future<?> submitJobFuture)
+      throws InterruptedException, ExecutionException {
+    if (submitJobFuture != null) {
+      submitJobFuture.get();
+    }
   }
 
   @Override
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
index 82aeb8b20f..2d17fe20a3 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
@@ -20,12 +20,19 @@ package org.apache.gobblin.temporal.joblauncher;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import com.google.common.eventbus.EventBus;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import io.temporal.api.enums.v1.WorkflowExecutionStatus;
+import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
+import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
 import io.temporal.client.WorkflowClient;
+import io.temporal.client.WorkflowFailedException;
+import io.temporal.client.WorkflowStub;
 import io.temporal.serviceclient.WorkflowServiceStubs;
 import io.temporal.workflow.Workflow;
 
@@ -65,10 +72,13 @@ import static 
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClien
 @Alpha
 public abstract class GobblinTemporalJobLauncher extends GobblinJobLauncher {
   private static final Logger log = 
Workflow.getLogger(GobblinTemporalJobLauncher.class);
+  private static final int TERMINATION_TIMEOUT_SECONDS = 3;
 
   protected WorkflowServiceStubs workflowServiceStubs;
   protected WorkflowClient client;
   protected String queueName;
+  protected String namespace;
+  protected String workflowId;
 
   public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir,
                                     List<? extends Tag<?>> metadataTags, 
ConcurrentHashMap<String, Boolean> runningMap, EventBus eventBus)
@@ -79,11 +89,13 @@ public abstract class GobblinTemporalJobLauncher extends 
GobblinJobLauncher {
     String connectionUri = jobProps.getProperty(TEMPORAL_CONNECTION_STRING);
     this.workflowServiceStubs = createServiceInstance(connectionUri);
 
-    String namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE, 
DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
+    this.namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE, 
DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
     this.client = createClientInstance(workflowServiceStubs, namespace);
 
     this.queueName = jobProps.getProperty(GOBBLIN_TEMPORAL_TASK_QUEUE, 
DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);
 
+    // non-null value indicates job has been submitted
+    this.workflowId = null;
     startCancellationExecutor();
   }
 
@@ -113,7 +125,56 @@ public abstract class GobblinTemporalJobLauncher extends 
GobblinJobLauncher {
 
   @Override
   protected void executeCancellation() {
-    log.info("Cancel temporal workflow");
+    if (this.workflowId == null) {
+      log.info("Cancellation of temporal workflow attempted without submitting 
it");
+      return;
+    }
+
+    log.info("Cancelling temporal workflow {}", this.workflowId);
+    try {
+      WorkflowStub workflowStub = 
this.client.newUntypedWorkflowStub(this.workflowId);
+
+      // Describe the workflow execution to get its status
+      DescribeWorkflowExecutionRequest request = 
DescribeWorkflowExecutionRequest.newBuilder()
+          .setNamespace(this.namespace)
+          .setExecution(workflowStub.getExecution())
+          .build();
+      DescribeWorkflowExecutionResponse response = 
workflowServiceStubs.blockingStub().describeWorkflowExecution(request);
+
+      WorkflowExecutionStatus status;
+      try {
+        status = response.getWorkflowExecutionInfo().getStatus();
+      } catch (Exception e) {
+        log.warn("Exception occurred while getting status of the workflow " + 
this.workflowId
+            + ". We would still attempt the cancellation", e);
+        workflowStub.cancel();
+        log.info("Temporal workflow {} cancelled successfully", 
this.workflowId);
+        return;
+      }
+
+      // Check if the workflow is not finished
+      if (status != 
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED &&
+          status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED &&
+          status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED 
&&
+          status != 
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED) {
+        workflowStub.cancel();
+        try {
+          // Check workflow status, if it is cancelled, will throw 
WorkflowFailedException else TimeoutException
+          workflowStub.getResult(TERMINATION_TIMEOUT_SECONDS, 
TimeUnit.SECONDS, String.class, String.class);
+        } catch (TimeoutException te) {
+          // Workflow is still running, terminate it.
+          log.info("Workflow is still running, will attempt termination", te);
+          workflowStub.terminate("Job cancel invoked");
+        } catch (WorkflowFailedException wfe) {
+          // Do nothing as exception is expected.
+        }
+        log.info("Temporal workflow {} cancelled successfully", 
this.workflowId);
+      } else {
+        log.info("Workflow {} is already finished with status {}", 
this.workflowId, status);
+      }
+    } catch (Exception e) {
+      log.error("Exception occurred while cancelling the workflow " + 
this.workflowId, e);
+    }
   }
 
   /** No-op: merely logs a warning, since not expected to be invoked */
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
index 76629aa68d..34a1cec4dc 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
@@ -210,6 +210,14 @@ public class GobblinTemporalJobScheduler extends 
JobScheduler implements Standar
         LOGGER.info("No job schedule found, so running job " + jobUri);
         GobblinTemporalJobLauncherListener listener = new 
GobblinTemporalJobLauncherListener(this.launcherMetrics);
         JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig());
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+          try {
+            launcher.cancelJob(listener);
+          } catch (JobException e) {
+            LOGGER.error("Failed to cancel the job during shutdown", e);
+            throw new RuntimeException(e);
+          }
+        }));
         launcher.launchJob(listener);
       }
     } catch (Exception je) {
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
new file mode 100644
index 0000000000..98d0379b65
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.joblauncher;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.fs.Path;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+
+import io.temporal.api.common.v1.WorkflowExecution;
+import io.temporal.api.enums.v1.WorkflowExecutionStatus;
+import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
+import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
+import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
+import io.temporal.client.WorkflowClient;
+import io.temporal.client.WorkflowStub;
+import io.temporal.serviceclient.WorkflowServiceStubs;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.example.simplejson.SimpleJsonSource;
+import org.apache.gobblin.runtime.locks.FileBasedJobLock;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import 
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory;
+import org.apache.gobblin.util.JobLauncherUtils;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class GobblinTemporalJobLauncherTest {
+
+  private GobblinTemporalJobLauncher jobLauncher;
+  private MockedStatic<TemporalWorkflowClientFactory> 
mockWorkflowClientFactory;
+  private WorkflowServiceStubs mockServiceStubs;
+  private WorkflowClient mockClient;
+  private WorkflowStub mockStub;
+  private WorkflowExecutionInfo mockExecutionInfo;
+  private Properties jobProperties;
+
+  class GobblinTemporalJobLauncherForTest extends GobblinTemporalJobLauncher {
+    public GobblinTemporalJobLauncherForTest(Properties jobProperties, Path 
appWorkDir) throws Exception {
+      super(jobProperties, appWorkDir, new ArrayList<>(), new 
ConcurrentHashMap<>(), null);
+    }
+
+    @Override
+    protected void submitJob(List<WorkUnit> workUnits)
+        throws Exception {
+      this.workflowId = "someWorkflowId";
+    }
+  }
+
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    mockServiceStubs = mock(WorkflowServiceStubs.class);
+    mockClient = mock(WorkflowClient.class);
+    mockExecutionInfo = mock(WorkflowExecutionInfo.class);
+    DescribeWorkflowExecutionResponse mockResponse = 
mock(DescribeWorkflowExecutionResponse.class);
+    WorkflowServiceGrpc.WorkflowServiceBlockingStub mockBlockingStub = 
mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class);
+    when(mockServiceStubs.blockingStub()).thenReturn(mockBlockingStub);
+    
when(mockBlockingStub.describeWorkflowExecution(Mockito.any())).thenReturn(mockResponse);
+    
when(mockResponse.getWorkflowExecutionInfo()).thenReturn(mockExecutionInfo);
+
+    mockWorkflowClientFactory = 
Mockito.mockStatic(TemporalWorkflowClientFactory.class);
+    mockWorkflowClientFactory.when(() -> 
TemporalWorkflowClientFactory.createServiceInstance(Mockito.anyString()))
+        .thenReturn(mockServiceStubs);
+    mockWorkflowClientFactory.when(() -> 
TemporalWorkflowClientFactory.createClientInstance(Mockito.any(), 
Mockito.anyString()))
+        .thenReturn(mockClient);
+
+    jobProperties = new Properties();
+    jobProperties.setProperty(ConfigurationKeys.FS_URI_KEY, "file:///");
+    
jobProperties.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING,
 "someConnString");
+    jobProperties.setProperty(ConfigurationKeys.JOB_LOCK_TYPE, 
FileBasedJobLock.class.getName());
+    jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, 
SimpleJsonSource.class.getName());
+  }
+
+  @BeforeMethod
+  public void methodSetUp() throws Exception {
+    mockStub = mock(WorkflowStub.class);
+    
when(mockClient.newUntypedWorkflowStub(Mockito.anyString())).thenReturn(mockStub);
+    
when(mockStub.getExecution()).thenReturn(WorkflowExecution.getDefaultInstance());
+
+    File tmpDir = Files.createTempDir();
+    String basePath = tmpDir.getAbsolutePath();
+    Path appWorkDir = new Path(basePath, "testAppWorkDir");
+    String jobLockDir = new Path(basePath, "jobLockDir").toString();
+    String stateStoreDir = new Path(basePath, "stateStoreDir").toString();
+    String jobName = "testJob";
+    String jobId = JobLauncherUtils.newJobId(jobName);
+    jobProperties.setProperty(ConfigurationKeys.JOB_NAME_KEY, jobName);
+    jobProperties.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId);
+    jobProperties.setProperty(FileBasedJobLock.JOB_LOCK_DIR, jobLockDir);
+    jobProperties.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
stateStoreDir);
+
+    jobLauncher = new GobblinTemporalJobLauncherForTest(jobProperties, 
appWorkDir);
+  }
+
+  @Test
+  public void testCancelWorkflowIfFailed() throws Exception {
+    // For workflowId to be generated
+    jobLauncher.submitJob(null);
+
+    // Mock the workflow status to be failed
+    when(mockExecutionInfo.getStatus())
+        .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED);
+
+    jobLauncher.executeCancellation();
+
+    verify(mockStub, times(0)).cancel();
+  }
+
+  @Test
+  public void testCancelWorkflowIfCompleted() throws Exception {
+    // For workflowId to be generated
+    jobLauncher.submitJob(null);
+
+    // Mock the workflow status to be completed
+    when(mockExecutionInfo.getStatus())
+        
.thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);
+
+    jobLauncher.executeCancellation();
+
+    verify(mockStub, times(0)).cancel();
+  }
+
+  @Test
+  public void testCancelWorkflowIfRunning() throws Exception {
+    // Mock the workflow status to be running
+    when(mockExecutionInfo.getStatus())
+        .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING);
+
+    jobLauncher.executeCancellation();
+
+    // Verify that the cancel method was not called without job submission
+    verify(mockStub, times(0)).cancel();
+
+    jobLauncher.submitJob(null);
+
+    jobLauncher.executeCancellation();
+
+    verify(mockStub, times(1)).cancel();
+  }
+
+  @Test
+  public void testCancelWorkflowFetchStatusThrowsException() throws Exception {
+    // Mock the get workflow status to throw an exception
+    Mockito.doThrow(new RuntimeException("Some exception 
occurred")).when(mockExecutionInfo).getStatus();
+
+    jobLauncher.submitJob(null);
+
+    jobLauncher.executeCancellation();
+
+    verify(mockStub, times(1)).cancel();
+
+    Mockito.reset(mockExecutionInfo);
+  }
+
+  @Test
+  public void testTerminateWorkflow() throws Exception {
+    // Mock the workflow status to be running
+    when(mockExecutionInfo.getStatus())
+        .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING);
+
+    // Mock getResult to throw TimeoutException
+    Mockito.doThrow(new TimeoutException("Workflow still in running"))
+        .when(mockStub).getResult(3L, TimeUnit.SECONDS, String.class, 
String.class);
+
+    jobLauncher.submitJob(null);
+
+    jobLauncher.executeCancellation();
+
+    verify(mockStub, times(1)).terminate("Job cancel invoked");
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-temporal/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
 
b/gobblin-temporal/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000000..1f0955d450
--- /dev/null
+++ 
b/gobblin-temporal/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline

Reply via email to