This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 20c434188f [GOBBLIN-2148] Add temporal workflow cancel support (#4045)
20c434188f is described below
commit 20c434188ff0715110d5f708b9e16ed8612059c4
Author: abhishekmjain <[email protected]>
AuthorDate: Mon Oct 7 23:24:39 2024 +0530
[GOBBLIN-2148] Add temporal workflow cancel support (#4045)
* Add temporal workflow cancel support
---
.../ddm/launcher/ExecuteGobblinJobLauncher.java | 4 +-
.../ddm/launcher/GenerateWorkUnitsJobLauncher.java | 3 +-
.../ddm/launcher/ProcessWorkUnitsJobLauncher.java | 3 +-
.../temporal/joblauncher/GobblinJobLauncher.java | 25 ++-
.../joblauncher/GobblinTemporalJobLauncher.java | 65 ++++++-
.../joblauncher/GobblinTemporalJobScheduler.java | 8 +
.../GobblinTemporalJobLauncherTest.java | 204 +++++++++++++++++++++
.../org.mockito.plugins.MockMaker | 1 +
8 files changed, 303 insertions(+), 10 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
index 4c2e44c5bd..b2f65ccb41 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -83,10 +83,12 @@ public class ExecuteGobblinJobLauncher extends
GobblinTemporalJobLauncher {
public void submitJob(List<WorkUnit> workunits) {
try {
Properties finalProps = adjustJobProperties(this.jobProps);
+ // Initialize workflowId.
+ this.workflowId =
Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE,
ConfigFactory.parseProperties(finalProps));
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
-
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE,
ConfigFactory.parseProperties(finalProps)))
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(finalProps))
+ .setWorkflowId(this.workflowId)
.build();
ExecuteGobblinWorkflow workflow =
this.client.newWorkflowStub(ExecuteGobblinWorkflow.class, options);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
index ce0d8b732b..b058472c6b 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
@@ -71,10 +71,11 @@ public class GenerateWorkUnitsJobLauncher extends
GobblinTemporalJobLauncher {
@Override
public void submitJob(List<WorkUnit> workunits) {
try {
+ this.workflowId =
Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE,
ConfigFactory.parseProperties(jobProps));
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps))
-
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE,
ConfigFactory.parseProperties(jobProps)))
+ .setWorkflowId(this.workflowId)
.build();
GenerateWorkUnitsWorkflow workflow =
this.client.newWorkflowStub(GenerateWorkUnitsWorkflow.class, options);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index d0ce87c05d..709a9cf935 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -102,10 +102,11 @@ public class ProcessWorkUnitsJobLauncher extends
GobblinTemporalJobLauncher {
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX,
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX));
+ this.workflowId =
Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec,
ConfigFactory.parseProperties(jobProps));
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps))
-
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec,
ConfigFactory.parseProperties(jobProps)))
+ .setWorkflowId(this.workflowId)
.build();
Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec,
Help.loadFileSystem(wuSpec)));
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
index 12d8861ecd..ea2c2ce7c2 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
@@ -22,6 +22,10 @@ import java.net.URI;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -87,7 +91,7 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
protected final StateStores stateStores;
protected JobListener jobListener;
protected volatile boolean jobSubmitted = false;
-
+ private final ExecutorService executor;
public GobblinJobLauncher(Properties jobProps, Path appWorkDir,
List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean>
runningMap, EventBus eventbus)
@@ -122,6 +126,7 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
this.taskStateCollectorService =
new TaskStateCollectorService(jobProps, this.jobContext.getJobState(),
this.eventBus, this.eventSubmitter,
this.stateStores.getTaskStateStore(), this.outputTaskStateDir,
this.getIssueRepository());
+ this.executor = Executors.newSingleThreadExecutor();
}
@Override
@@ -150,17 +155,23 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
// Start the output TaskState collector service
this.taskStateCollectorService.startAsync().awaitRunning();
+ Future<?> submitJobFuture = null;
synchronized (this.cancellationRequest) {
if (!this.cancellationRequested) {
- submitJob(workUnits);
+ submitJobFuture = executor.submit(() -> {
+ try {
+ submitJob(workUnits);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
log.info(String.format("Submitted job %s",
this.jobContext.getJobId()));
this.jobSubmitted = true;
} else {
log.warn("Job {} not submitted as it was requested to be
cancelled.", this.jobContext.getJobId());
}
}
-
- waitJob();
+ waitJob(submitJobFuture);
log.info(String.format("Job %s completed", this.jobContext.getJobId()));
} finally {
// The last iteration of output TaskState collecting will run when the
collector service gets stopped
@@ -172,7 +183,11 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
protected void submitJob(List<WorkUnit> workUnits) throws Exception {
}
- protected void waitJob() throws InterruptedException {
+ protected void waitJob(Future<?> submitJobFuture)
+ throws InterruptedException, ExecutionException {
+ if (submitJobFuture != null) {
+ submitJobFuture.get();
+ }
}
@Override
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
index 82aeb8b20f..2d17fe20a3 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
@@ -20,12 +20,19 @@ package org.apache.gobblin.temporal.joblauncher;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import io.temporal.api.enums.v1.WorkflowExecutionStatus;
+import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
+import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
import io.temporal.client.WorkflowClient;
+import io.temporal.client.WorkflowFailedException;
+import io.temporal.client.WorkflowStub;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.workflow.Workflow;
@@ -65,10 +72,13 @@ import static
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClien
@Alpha
public abstract class GobblinTemporalJobLauncher extends GobblinJobLauncher {
private static final Logger log =
Workflow.getLogger(GobblinTemporalJobLauncher.class);
+ private static final int TERMINATION_TIMEOUT_SECONDS = 3;
protected WorkflowServiceStubs workflowServiceStubs;
protected WorkflowClient client;
protected String queueName;
+ protected String namespace;
+ protected String workflowId;
public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir,
List<? extends Tag<?>> metadataTags,
ConcurrentHashMap<String, Boolean> runningMap, EventBus eventBus)
@@ -79,11 +89,13 @@ public abstract class GobblinTemporalJobLauncher extends
GobblinJobLauncher {
String connectionUri = jobProps.getProperty(TEMPORAL_CONNECTION_STRING);
this.workflowServiceStubs = createServiceInstance(connectionUri);
- String namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE,
DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
+ this.namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE,
DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
this.client = createClientInstance(workflowServiceStubs, namespace);
this.queueName = jobProps.getProperty(GOBBLIN_TEMPORAL_TASK_QUEUE,
DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);
+ // non-null value indicates job has been submitted
+ this.workflowId = null;
startCancellationExecutor();
}
@@ -113,7 +125,56 @@ public abstract class GobblinTemporalJobLauncher extends
GobblinJobLauncher {
@Override
protected void executeCancellation() {
- log.info("Cancel temporal workflow");
+ if (this.workflowId == null) {
+ log.info("Cancellation of temporal workflow attempted without submitting
it");
+ return;
+ }
+
+ log.info("Cancelling temporal workflow {}", this.workflowId);
+ try {
+ WorkflowStub workflowStub =
this.client.newUntypedWorkflowStub(this.workflowId);
+
+ // Describe the workflow execution to get its status
+ DescribeWorkflowExecutionRequest request =
DescribeWorkflowExecutionRequest.newBuilder()
+ .setNamespace(this.namespace)
+ .setExecution(workflowStub.getExecution())
+ .build();
+ DescribeWorkflowExecutionResponse response =
workflowServiceStubs.blockingStub().describeWorkflowExecution(request);
+
+ WorkflowExecutionStatus status;
+ try {
+ status = response.getWorkflowExecutionInfo().getStatus();
+ } catch (Exception e) {
+ log.warn("Exception occurred while getting status of the workflow " +
this.workflowId
+ + ". We would still attempt the cancellation", e);
+ workflowStub.cancel();
+ log.info("Temporal workflow {} cancelled successfully",
this.workflowId);
+ return;
+ }
+
+ // Check if the workflow is not finished
+ if (status !=
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED &&
+ status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED &&
+ status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED
&&
+ status !=
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED) {
+ workflowStub.cancel();
+ try {
+ // Check workflow status, if it is cancelled, will throw
WorkflowFailedException else TimeoutException
+ workflowStub.getResult(TERMINATION_TIMEOUT_SECONDS,
TimeUnit.SECONDS, String.class, String.class);
+ } catch (TimeoutException te) {
+ // Workflow is still running, terminate it.
+ log.info("Workflow is still running, will attempt termination", te);
+ workflowStub.terminate("Job cancel invoked");
+ } catch (WorkflowFailedException wfe) {
+ // Do nothing as exception is expected.
+ }
+ log.info("Temporal workflow {} cancelled successfully",
this.workflowId);
+ } else {
+ log.info("Workflow {} is already finished with status {}",
this.workflowId, status);
+ }
+ } catch (Exception e) {
+ log.error("Exception occurred while cancelling the workflow " +
this.workflowId, e);
+ }
}
/** No-op: merely logs a warning, since not expected to be invoked */
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
index 76629aa68d..34a1cec4dc 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
@@ -210,6 +210,14 @@ public class GobblinTemporalJobScheduler extends
JobScheduler implements Standar
LOGGER.info("No job schedule found, so running job " + jobUri);
GobblinTemporalJobLauncherListener listener = new
GobblinTemporalJobLauncherListener(this.launcherMetrics);
JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig());
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ launcher.cancelJob(listener);
+ } catch (JobException e) {
+ LOGGER.error("Failed to cancel the job during shutdown", e);
+ throw new RuntimeException(e);
+ }
+ }));
launcher.launchJob(listener);
}
} catch (Exception je) {
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
new file mode 100644
index 0000000000..98d0379b65
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.joblauncher;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.fs.Path;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+
+import io.temporal.api.common.v1.WorkflowExecution;
+import io.temporal.api.enums.v1.WorkflowExecutionStatus;
+import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
+import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
+import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
+import io.temporal.client.WorkflowClient;
+import io.temporal.client.WorkflowStub;
+import io.temporal.serviceclient.WorkflowServiceStubs;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.example.simplejson.SimpleJsonSource;
+import org.apache.gobblin.runtime.locks.FileBasedJobLock;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory;
+import org.apache.gobblin.util.JobLauncherUtils;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class GobblinTemporalJobLauncherTest {
+
+ private GobblinTemporalJobLauncher jobLauncher;
+ private MockedStatic<TemporalWorkflowClientFactory>
mockWorkflowClientFactory;
+ private WorkflowServiceStubs mockServiceStubs;
+ private WorkflowClient mockClient;
+ private WorkflowStub mockStub;
+ private WorkflowExecutionInfo mockExecutionInfo;
+ private Properties jobProperties;
+
+ class GobblinTemporalJobLauncherForTest extends GobblinTemporalJobLauncher {
+ public GobblinTemporalJobLauncherForTest(Properties jobProperties, Path
appWorkDir) throws Exception {
+ super(jobProperties, appWorkDir, new ArrayList<>(), new
ConcurrentHashMap<>(), null);
+ }
+
+ @Override
+ protected void submitJob(List<WorkUnit> workUnits)
+ throws Exception {
+ this.workflowId = "someWorkflowId";
+ }
+ }
+
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ mockServiceStubs = mock(WorkflowServiceStubs.class);
+ mockClient = mock(WorkflowClient.class);
+ mockExecutionInfo = mock(WorkflowExecutionInfo.class);
+ DescribeWorkflowExecutionResponse mockResponse =
mock(DescribeWorkflowExecutionResponse.class);
+ WorkflowServiceGrpc.WorkflowServiceBlockingStub mockBlockingStub =
mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class);
+ when(mockServiceStubs.blockingStub()).thenReturn(mockBlockingStub);
+
when(mockBlockingStub.describeWorkflowExecution(Mockito.any())).thenReturn(mockResponse);
+
when(mockResponse.getWorkflowExecutionInfo()).thenReturn(mockExecutionInfo);
+
+ mockWorkflowClientFactory =
Mockito.mockStatic(TemporalWorkflowClientFactory.class);
+ mockWorkflowClientFactory.when(() ->
TemporalWorkflowClientFactory.createServiceInstance(Mockito.anyString()))
+ .thenReturn(mockServiceStubs);
+ mockWorkflowClientFactory.when(() ->
TemporalWorkflowClientFactory.createClientInstance(Mockito.any(),
Mockito.anyString()))
+ .thenReturn(mockClient);
+
+ jobProperties = new Properties();
+ jobProperties.setProperty(ConfigurationKeys.FS_URI_KEY, "file:///");
+
jobProperties.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING,
"someConnString");
+ jobProperties.setProperty(ConfigurationKeys.JOB_LOCK_TYPE,
FileBasedJobLock.class.getName());
+ jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY,
SimpleJsonSource.class.getName());
+ }
+
+ @BeforeMethod
+ public void methodSetUp() throws Exception {
+ mockStub = mock(WorkflowStub.class);
+
when(mockClient.newUntypedWorkflowStub(Mockito.anyString())).thenReturn(mockStub);
+
when(mockStub.getExecution()).thenReturn(WorkflowExecution.getDefaultInstance());
+
+ File tmpDir = Files.createTempDir();
+ String basePath = tmpDir.getAbsolutePath();
+ Path appWorkDir = new Path(basePath, "testAppWorkDir");
+ String jobLockDir = new Path(basePath, "jobLockDir").toString();
+ String stateStoreDir = new Path(basePath, "stateStoreDir").toString();
+ String jobName = "testJob";
+ String jobId = JobLauncherUtils.newJobId(jobName);
+ jobProperties.setProperty(ConfigurationKeys.JOB_NAME_KEY, jobName);
+ jobProperties.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId);
+ jobProperties.setProperty(FileBasedJobLock.JOB_LOCK_DIR, jobLockDir);
+ jobProperties.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
stateStoreDir);
+
+ jobLauncher = new GobblinTemporalJobLauncherForTest(jobProperties,
appWorkDir);
+ }
+
+ @Test
+ public void testCancelWorkflowIfFailed() throws Exception {
+ // For workflowId to be generated
+ jobLauncher.submitJob(null);
+
+ // Mock the workflow status to be failed
+ when(mockExecutionInfo.getStatus())
+ .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED);
+
+ jobLauncher.executeCancellation();
+
+ verify(mockStub, times(0)).cancel();
+ }
+
+ @Test
+ public void testCancelWorkflowIfCompleted() throws Exception {
+ // For workflowId to be generated
+ jobLauncher.submitJob(null);
+
+ // Mock the workflow status to be completed
+ when(mockExecutionInfo.getStatus())
+
.thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);
+
+ jobLauncher.executeCancellation();
+
+ verify(mockStub, times(0)).cancel();
+ }
+
+ @Test
+ public void testCancelWorkflowIfRunning() throws Exception {
+ // Mock the workflow status to be running
+ when(mockExecutionInfo.getStatus())
+ .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING);
+
+ jobLauncher.executeCancellation();
+
+ // Verify that the cancel method was not called without job submission
+ verify(mockStub, times(0)).cancel();
+
+ jobLauncher.submitJob(null);
+
+ jobLauncher.executeCancellation();
+
+ verify(mockStub, times(1)).cancel();
+ }
+
+ @Test
+ public void testCancelWorkflowFetchStatusThrowsException() throws Exception {
+ // Mock the get workflow status to throw an exception
+ Mockito.doThrow(new RuntimeException("Some exception
occurred")).when(mockExecutionInfo).getStatus();
+
+ jobLauncher.submitJob(null);
+
+ jobLauncher.executeCancellation();
+
+ verify(mockStub, times(1)).cancel();
+
+ Mockito.reset(mockExecutionInfo);
+ }
+
+ @Test
+ public void testTerminateWorkflow() throws Exception {
+ // Mock the workflow status to be running
+ when(mockExecutionInfo.getStatus())
+ .thenReturn(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING);
+
+ // Mock getResult to throw TimeoutException
+ Mockito.doThrow(new TimeoutException("Workflow still in running"))
+ .when(mockStub).getResult(3L, TimeUnit.SECONDS, String.class,
String.class);
+
+ jobLauncher.submitJob(null);
+
+ jobLauncher.executeCancellation();
+
+ verify(mockStub, times(1)).terminate("Job cancel invoked");
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-temporal/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
b/gobblin-temporal/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000000..1f0955d450
--- /dev/null
+++
b/gobblin-temporal/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline