This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e857c002c6 [GOBBLIN-2133] provide job future before calling
SpecProducer::cancelJob (#4027)
e857c002c6 is described below
commit e857c002c6191a5ed94aa9efad09d4006d9d4db0
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Aug 15 15:42:00 2024 -0700
[GOBBLIN-2133] provide job future before calling SpecProducer::cancelJob
(#4027)
* correct order in killing
* add more unit tests
---
.../spec_executorInstance/MockedSpecExecutor.java | 3 +-
gobblin-service/build.gradle | 2 +
.../modules/orchestration/proc/DagProcUtils.java | 42 +++++++-------
.../service/monitoring/KafkaJobStatusMonitor.java | 2 +-
.../orchestration/proc/KillDagProcTest.java | 64 +++++++++++++++++++++-
.../orchestration/proc/LaunchDagProcTest.java | 29 ++++++++--
gradle/scripts/dependencyDefinitions.gradle | 2 +
7 files changed, 114 insertions(+), 30 deletions(-)
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
index 7647b60e27..dfcc88ed13 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
@@ -42,13 +42,14 @@ import static org.mockito.Mockito.when;
public class MockedSpecExecutor extends InMemorySpecExecutor {
private final SpecProducer<Spec> mockedSpecProducer;
private final Config config;
+ public static final String dummySerializedFuture = "12345";
public MockedSpecExecutor(Config config) {
super(config);
this.config = config;
this.mockedSpecProducer = Mockito.mock(SpecProducer.class);
when(mockedSpecProducer.addSpec(any())).thenReturn(new
CompletedFuture(Boolean.TRUE, null));
- when(mockedSpecProducer.serializeAddSpecResponse(any())).thenReturn("");
+
when(mockedSpecProducer.serializeAddSpecResponse(any())).thenReturn(dummySerializedFuture);
when(mockedSpecProducer.deserializeAddSpecResponse(any())).thenReturn(new
CompletedFuture(Boolean.TRUE, null));
when(mockedSpecProducer.cancelJob(any(), any())).thenReturn(new
CompletedFuture(Boolean.TRUE, null));
}
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index 80b2d7e396..b0a051a5bd 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -91,6 +91,8 @@ dependencies {
testCompile externalDependency.hamcrest
testCompile externalDependency.jhyde
testCompile externalDependency.mockitoInline
+ testCompile externalDependency.powerMockApi
+ testCompile externalDependency.powerMockModule
testCompile externalDependency.testContainers
testCompile externalDependency.testContainersMysql
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 21285bb994..289454502f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -24,16 +24,17 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.JobSpec;
@@ -64,7 +65,7 @@ public class DagProcUtils {
/**
* If there is a single job to run next, it runs it. If there are multiple
jobs to run, it creates a
- * {@link
org.apache.gobblin.service.modules.orchestration.DagActionStore.DagActionType#REEVALUATE}
dag action for
+ * {@link DagActionStore.DagActionType#REEVALUATE} dag action for
* each of them and those jobs will be launched in respective {@link
ReevaluateDagProc}.
*/
public static void submitNextNodes(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag,
@@ -85,7 +86,7 @@ public class DagProcUtils {
/**
* - submits a {@link JobSpec} to a {@link SpecExecutor}
- * - emits a {@link TimingEvent.LauncherTimings#JOB_ORCHESTRATED} {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}
+ * - emits a {@link TimingEvent.LauncherTimings#JOB_ORCHESTRATED} {@link
GobblinTrackingEvent}
* that measures the time needed to submit the job to {@link SpecExecutor}
* - increment running jobs counter for the {@link Dag}, the proxy user that
submitted the job and the {@link SpecExecutor} job was sent to
* - add updated dag node state to dagManagementStateStore
@@ -122,7 +123,7 @@ public class DagProcUtils {
Future<?> addSpecFuture = producer.addSpec(jobSpec);
// todo - we should add future.get() instead of the complete future into
the JobExecutionPlan
-
dagNode.getValue().setJobFuture(com.google.common.base.Optional.of(addSpecFuture));
+ dagNode.getValue().setJobFuture(Optional.of(addSpecFuture));
addSpecFuture.get();
jobExecutionPlan.setExecutionStatus(ExecutionStatus.ORCHESTRATED);
jobMetadata.put(TimingEvent.METADATA_MESSAGE,
producer.getExecutionLink(addSpecFuture, specExecutorUri));
@@ -155,28 +156,32 @@ public class DagProcUtils {
}
public static void cancelDagNode(Dag.DagNode<JobExecutionPlan>
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws
IOException {
- Properties props = new Properties();
+ Properties cancelJobArgs = new Properties();
DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
+ String serializedFuture = null;
if
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
{
- props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+ cancelJobArgs.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
}
try {
-
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
props).get();
+ if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+ Future<?> future = dagNodeToCancel.getValue().getJobFuture().get();
+ serializedFuture =
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+ cancelJobArgs.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
serializedFuture);
+ } else {
+ log.warn("No Job future when canceling DAG node - {}",
dagNodeToCancel.getValue().getId());
+ }
+
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
cancelJobArgs).get();
// add back the dag node with updated states in the store
+ dagNodeToCancel.getValue().setExecutionStatus(CANCELLED);
dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId);
// send cancellation event after updating the state, because
cancellation event triggers a ReevaluateDagAction
// that will delete the dag. Due to race condition between adding dag
node and deleting dag, state store may get
// into inconsistent state.
- if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
- sendCancellationEvent(dagNodeToCancel, props);
- log.info("Cancelled dag node {}, spec_producer_future {}",
dagNodeToCancel.getValue().getId(),
- props.get(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE));
- } else {
- log.warn("No Job future when canceling DAG node - {}",
dagNodeToCancel.getValue().getId());
- }
+ sendCancellationEvent(dagNodeToCancel);
+ log.info("Cancelled dag node {}, spec_producer_future {}",
dagNodeToCancel.getValue().getId(), serializedFuture);
} catch (Exception e) {
throw new IOException(e);
}
@@ -191,19 +196,14 @@ public class DagProcUtils {
}
}
- private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan>
dagNodeToCancel, Properties props)
- throws ExecutionException, InterruptedException {
+ private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan>
dagNodeToCancel) {
JobExecutionPlan jobExecutionPlan = dagNodeToCancel.getValue();
- Future<?> future = jobExecutionPlan.getJobFuture().get();
- String serializedFuture =
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
- props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
serializedFuture);
Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
- jobExecutionPlan.setExecutionStatus(CANCELLED);
}
/**
- * Sets {@link Dag#flowEvent} and emits a {@link
org.apache.gobblin.metrics.GobblinTrackingEvent} of the provided
+ * Sets {@link Dag#flowEvent} and emits a {@link GobblinTrackingEvent} of
the provided
* flow event type.
*/
public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter,
Dag<JobExecutionPlan> dag, String flowEvent) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index b897f74381..c2ee1308f5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -345,7 +345,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
NewState newState = newState(jobStatus, states);
String newStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
if (newState == NewState.FINISHED) {
- log.info("Flow {}:{}:{}:{} reached a terminal state {}", flowGroup,
flowName, flowExecutionId, jobName, newStatus);
+ log.info("Flow/Job {}:{}:{}:{} reached a terminal state {}",
flowGroup, flowName, flowExecutionId, jobName, newStatus);
}
return ImmutablePair.of(jobStatus, newState);
} catch (Exception e) {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index 519a54f3e7..cd34b47cb9 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -21,14 +21,20 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Optional;
+import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.junit.runner.RunWith;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.typesafe.config.ConfigFactory;
@@ -37,8 +43,12 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
@@ -56,15 +66,24 @@ import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.monitoring.JobStatus;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
+import static org.powermock.reflect.Whitebox.setInternalState;
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EventSubmitter.class)
public class KillDagProcTest {
private MySqlDagManagementStateStore dagManagementStateStore;
private ITestMetastoreDatabase testDb;
private DagProcessingEngineMetrics mockedDagProcEngineMetrics;
+ private MockedStatic<DagProc> dagProc;
+ private EventSubmitter mockedEventSubmitter;
@BeforeClass
public void setUp() throws Exception {
@@ -72,6 +91,13 @@ public class KillDagProcTest {
this.dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testDb));
LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
this.mockedDagProcEngineMetrics =
Mockito.mock(DagProcessingEngineMetrics.class);
+ this.dagProc = mockStatic(DagProc.class);
+ }
+
+ @BeforeMethod
+ public void resetMocks() {
+ this.mockedEventSubmitter = spy(new
EventSubmitter.Builder(RootMetricContext.get(),
"org.apache.gobblin.service").build());
+ setInternalState(DagProc.class, "eventSubmitter",
this.mockedEventSubmitter);
}
@AfterClass(alwaysRun = true)
@@ -80,8 +106,11 @@ public class KillDagProcTest {
// `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
this.testDb.close();
}
+ this.dagProc.close();
}
+ // launches the flow, submits first job, and then kills the dag.
+ // all the jobs are killed and first job that was already launched is killed
with the job future object.
@Test
public void killDag() throws IOException, URISyntaxException,
InterruptedException {
long flowExecutionId = System.currentTimeMillis();
@@ -112,15 +141,35 @@ public class KillDagProcTest {
null, this.dagManagementStateStore, mockedDagProcEngineMetrics),
ConfigFactory.empty());
killDagProc.process(this.dagManagementStateStore,
this.mockedDagProcEngineMetrics);
- long cancelJobCount = specProducers.stream()
+ int numOfLaunchedJobs = 1;
+ int numOfCancelledJobs = 5; // all jobs in the dag
+ int numOfCancelledFlows = 1;
+ int numOfCancelledJobsWithJobFuture = numOfLaunchedJobs;
+ long actualCancelJobCount = specProducers.stream()
.mapToLong(p -> Mockito.mockingDetails(p)
.getInvocations()
.stream()
.filter(a -> a.getMethod().getName().equals("cancelJob"))
.count())
.sum();
+
+ // kill dag procs kill only the launched jobs with parameters containing
jobFuture
+ Mockito.verify(specProducers.get(0),
Mockito.times(numOfCancelledJobsWithJobFuture)).cancelJob(any(), argThat(props
->
+ props.getProperty(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
"ABSENT").equals(MockedSpecExecutor.dummySerializedFuture)));
+
+ // job future object is not available for rest of the jobs cancel
parameters
+ specProducers.stream()
+ .skip(numOfCancelledJobsWithJobFuture) // separately verified
`specProducers.get(0)` above
+ .forEach(sp -> Mockito.verify(sp, Mockito.never()).cancelJob(any(),
argThat(props ->
+
props.getProperty(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
"ABSENT").equals(MockedSpecExecutor.dummySerializedFuture))));
+
// kill dag proc tries to cancel all the dag nodes
- Assert.assertEquals(cancelJobCount, 5);
+ Assert.assertEquals(actualCancelJobCount, numOfCancelledJobs);
+
+ Mockito.verify(this.mockedEventSubmitter,
Mockito.times(numOfCancelledJobs))
+ .submit(eq(TimingEvent.LauncherTimings.JOB_CANCEL), anyMap());
+ Mockito.verify(this.mockedEventSubmitter,
Mockito.times(numOfCancelledFlows))
+ .submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap());
}
@Test
@@ -159,14 +208,23 @@ public class KillDagProcTest {
null, this.dagManagementStateStore, this.mockedDagProcEngineMetrics),
ConfigFactory.empty());
killDagProc.process(this.dagManagementStateStore,
this.mockedDagProcEngineMetrics);
+ int numOfCancelledJobs = 1; // the only job that was cancelled
+ int numOfCancelledFlows = 1;
long cancelJobCount = specProducers.stream()
.mapToLong(p -> Mockito.mockingDetails(p)
.getInvocations()
.stream()
.filter(a -> a.getMethod().getName().equals("cancelJob"))
+ .filter(a -> ((Properties) a.getArgument(1))
+
.getProperty(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE).equals(MockedSpecExecutor.dummySerializedFuture))
.count())
.sum();
// kill dag proc tries to cancel only the exact dag node that was provided
- Assert.assertEquals(cancelJobCount, 1);
+ Assert.assertEquals(cancelJobCount, numOfCancelledJobs);
+
+ Mockito.verify(this.mockedEventSubmitter,
Mockito.times(numOfCancelledJobs))
+ .submit(eq(TimingEvent.LauncherTimings.JOB_CANCEL), anyMap());
+ Mockito.verify(this.mockedEventSubmitter,
Mockito.times(numOfCancelledFlows))
+ .submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap());
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 45caa295ea..99caa9a005 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -24,7 +24,11 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
+import org.junit.runner.RunWith;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -38,6 +42,9 @@ import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
@@ -65,20 +72,23 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.*;
+import static org.powermock.reflect.Whitebox.setInternalState;
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EventSubmitter.class)
public class LaunchDagProcTest {
private ITestMetastoreDatabase testMetastoreDatabase;
private MySqlDagManagementStateStore dagManagementStateStore;
private DagProcessingEngineMetrics mockedDagProcEngineMetrics;
+ private MockedStatic<DagProc> dagProc;
+ private EventSubmitter mockedEventSubmitter;
@BeforeClass
public void setUp() throws Exception {
this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ this.dagProc = mockStatic(DagProc.class);
}
/**
@@ -89,12 +99,15 @@ public class LaunchDagProcTest {
this.dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
mockDMSSCommonBehavior(this.dagManagementStateStore);
this.mockedDagProcEngineMetrics =
Mockito.mock(DagProcessingEngineMetrics.class);
+ this.mockedEventSubmitter = spy(new
EventSubmitter.Builder(RootMetricContext.get(),
"org.apache.gobblin.service").build());
+ setInternalState(DagProc.class, "eventSubmitter",
this.mockedEventSubmitter);
}
@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
// `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
this.testMetastoreDatabase.close();
+ this.dagProc.close();
}
@Test
@@ -127,6 +140,10 @@ public class LaunchDagProcTest {
Mockito.verify(this.dagManagementStateStore,
Mockito.times(numOfLaunchedJobs))
.addJobDagAction(any(), any(), anyLong(),
eq(DagActionStore.NO_JOB_NAME_DEFAULT),
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
+
+ // FLOW_RUNNING is emitted exactly once per flow during the execution of
LaunchDagProc
+ Mockito.verify(this.mockedEventSubmitter, Mockito.times(1))
+ .submit(eq(TimingEvent.FlowTimings.FLOW_RUNNING), anyMap());
}
@Test
@@ -153,6 +170,10 @@ public class LaunchDagProcTest {
// parallel jobs are launched through reevaluate dag action
Mockito.verify(this.dagManagementStateStore,
Mockito.times(numOfLaunchedJobs))
.addJobDagAction(eq(flowGroup), eq(flowName), eq(flowExecutionId),
any(), eq(DagActionStore.DagActionType.REEVALUATE));
+
+ // FLOW_RUNNING is emitted exactly once per flow during the execution of
LaunchDagProc
+ Mockito.verify(this.mockedEventSubmitter, Mockito.times(1))
+ .submit(eq(TimingEvent.FlowTimings.FLOW_RUNNING), anyMap());
}
// This creates a dag like this
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index 3847f9fc06..966ef824d7 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -204,6 +204,8 @@ ext.externalDependency = [
'parquetAvro': 'org.apache.parquet:parquet-avro:1.11.0',
'parquetProto': 'org.apache.parquet:parquet-protobuf:1.11.0',
'parquetHadoop': 'org.apache.parquet:parquet-hadoop-bundle:1.11.0',
+ 'powerMockApi' : 'org.powermock:powermock-api-mockito2:2.0.9',
+ 'powerMockModule' : 'org.powermock:powermock-module-junit4:2.0.9',
'reactivex': 'io.reactivex.rxjava2:rxjava:2.1.0',
"slf4j": [
"org.slf4j:slf4j-api:" + slf4jVersion,