This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e857c002c6 [GOBBLIN-2133] provide job future before calling 
SpecProducer::cancelJob (#4027)
e857c002c6 is described below

commit e857c002c6191a5ed94aa9efad09d4006d9d4db0
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Aug 15 15:42:00 2024 -0700

    [GOBBLIN-2133] provide job future before calling SpecProducer::cancelJob 
(#4027)
    
    * correct order in killing
    * add more unit tests
---
 .../spec_executorInstance/MockedSpecExecutor.java  |  3 +-
 gobblin-service/build.gradle                       |  2 +
 .../modules/orchestration/proc/DagProcUtils.java   | 42 +++++++-------
 .../service/monitoring/KafkaJobStatusMonitor.java  |  2 +-
 .../orchestration/proc/KillDagProcTest.java        | 64 +++++++++++++++++++++-
 .../orchestration/proc/LaunchDagProcTest.java      | 29 ++++++++--
 gradle/scripts/dependencyDefinitions.gradle        |  2 +
 7 files changed, 114 insertions(+), 30 deletions(-)

diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
index 7647b60e27..dfcc88ed13 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
@@ -42,13 +42,14 @@ import static org.mockito.Mockito.when;
 public class MockedSpecExecutor extends InMemorySpecExecutor {
   private final SpecProducer<Spec> mockedSpecProducer;
   private final Config config;
+  public static final String dummySerializedFuture = "12345";
 
   public MockedSpecExecutor(Config config) {
     super(config);
     this.config = config;
     this.mockedSpecProducer = Mockito.mock(SpecProducer.class);
     when(mockedSpecProducer.addSpec(any())).thenReturn(new 
CompletedFuture(Boolean.TRUE, null));
-    when(mockedSpecProducer.serializeAddSpecResponse(any())).thenReturn("");
+    
when(mockedSpecProducer.serializeAddSpecResponse(any())).thenReturn(dummySerializedFuture);
     when(mockedSpecProducer.deserializeAddSpecResponse(any())).thenReturn(new 
CompletedFuture(Boolean.TRUE, null));
     when(mockedSpecProducer.cancelJob(any(), any())).thenReturn(new 
CompletedFuture(Boolean.TRUE, null));
     }
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index 80b2d7e396..b0a051a5bd 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -91,6 +91,8 @@ dependencies {
   testCompile externalDependency.hamcrest
   testCompile externalDependency.jhyde
   testCompile externalDependency.mockitoInline
+  testCompile externalDependency.powerMockApi
+  testCompile externalDependency.powerMockModule
   testCompile externalDependency.testContainers
   testCompile externalDependency.testContainersMysql
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 21285bb994..289454502f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -24,16 +24,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.JobSpec;
@@ -64,7 +65,7 @@ public class DagProcUtils {
 
   /**
    * If there is a single job to run next, it runs it. If there are multiple 
jobs to run, it creates a
-   * {@link 
org.apache.gobblin.service.modules.orchestration.DagActionStore.DagActionType#REEVALUATE}
 dag action for
+   * {@link DagActionStore.DagActionType#REEVALUATE} dag action for
    * each of them and those jobs will be launched in respective {@link 
ReevaluateDagProc}.
    */
   public static void submitNextNodes(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag,
@@ -85,7 +86,7 @@ public class DagProcUtils {
 
   /**
    * - submits a {@link JobSpec} to a {@link SpecExecutor}
-   * - emits a {@link TimingEvent.LauncherTimings#JOB_ORCHESTRATED} {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent}
+   * - emits a {@link TimingEvent.LauncherTimings#JOB_ORCHESTRATED} {@link 
GobblinTrackingEvent}
    * that measures the time needed to submit the job to {@link SpecExecutor}
    * - increment running jobs counter for the {@link Dag}, the proxy user that 
submitted the job and the {@link SpecExecutor} job was sent to
    * - add updated dag node state to dagManagementStateStore
@@ -122,7 +123,7 @@ public class DagProcUtils {
 
       Future<?> addSpecFuture = producer.addSpec(jobSpec);
       // todo - we should add future.get() instead of the complete future into 
the JobExecutionPlan
-      
dagNode.getValue().setJobFuture(com.google.common.base.Optional.of(addSpecFuture));
+      dagNode.getValue().setJobFuture(Optional.of(addSpecFuture));
       addSpecFuture.get();
       jobExecutionPlan.setExecutionStatus(ExecutionStatus.ORCHESTRATED);
       jobMetadata.put(TimingEvent.METADATA_MESSAGE, 
producer.getExecutionLink(addSpecFuture, specExecutorUri));
@@ -155,28 +156,32 @@ public class DagProcUtils {
   }
 
   public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> 
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws 
IOException {
-    Properties props = new Properties();
+    Properties cancelJobArgs = new Properties();
     DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
+    String serializedFuture = null;
 
     if 
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
 {
-      props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+      cancelJobArgs.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
           
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
     }
 
     try {
-      
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
 props).get();
+      if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+        Future<?> future = dagNodeToCancel.getValue().getJobFuture().get();
+        serializedFuture = 
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+        cancelJobArgs.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
+      } else {
+        log.warn("No Job future when canceling DAG node - {}", 
dagNodeToCancel.getValue().getId());
+      }
+      
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
 cancelJobArgs).get();
       // add back the dag node with updated states in the store
+      dagNodeToCancel.getValue().setExecutionStatus(CANCELLED);
       dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId);
       // send cancellation event after updating the state, because 
cancellation event triggers a ReevaluateDagAction
       // that will delete the dag. Due to race condition between adding dag 
node and deleting dag, state store may get
       // into inconsistent state.
-      if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
-        sendCancellationEvent(dagNodeToCancel, props);
-        log.info("Cancelled dag node {}, spec_producer_future {}", 
dagNodeToCancel.getValue().getId(),
-            props.get(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE));
-      } else {
-        log.warn("No Job future when canceling DAG node - {}", 
dagNodeToCancel.getValue().getId());
-      }
+      sendCancellationEvent(dagNodeToCancel);
+      log.info("Cancelled dag node {}, spec_producer_future {}", 
dagNodeToCancel.getValue().getId(), serializedFuture);
     } catch (Exception e) {
       throw new IOException(e);
     }
@@ -191,19 +196,14 @@ public class DagProcUtils {
     }
   }
 
-  private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan> 
dagNodeToCancel, Properties props)
-      throws ExecutionException, InterruptedException {
+  private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan> 
dagNodeToCancel) {
     JobExecutionPlan jobExecutionPlan = dagNodeToCancel.getValue();
-    Future<?> future = jobExecutionPlan.getJobFuture().get();
-    String serializedFuture = 
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
-    props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
     Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
     
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
-    jobExecutionPlan.setExecutionStatus(CANCELLED);
   }
 
   /**
-   * Sets {@link Dag#flowEvent} and emits a {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent} of the provided
+   * Sets {@link Dag#flowEvent} and emits a {@link GobblinTrackingEvent} of 
the provided
    * flow event type.
    */
   public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, 
Dag<JobExecutionPlan> dag, String flowEvent) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index b897f74381..c2ee1308f5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -345,7 +345,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
       NewState newState = newState(jobStatus, states);
       String newStatus = 
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
       if (newState == NewState.FINISHED) {
-        log.info("Flow {}:{}:{}:{} reached a terminal state {}", flowGroup, 
flowName, flowExecutionId, jobName, newStatus);
+        log.info("Flow/Job {}:{}:{}:{} reached a terminal state {}", 
flowGroup, flowName, flowExecutionId, jobName, newStatus);
       }
       return ImmutablePair.of(jobStatus, newState);
     } catch (Exception e) {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index 519a54f3e7..cd34b47cb9 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -21,14 +21,20 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.junit.runner.RunWith;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.typesafe.config.ConfigFactory;
@@ -37,8 +43,12 @@ import com.typesafe.config.ConfigValueFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
@@ -56,15 +66,24 @@ import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 import org.apache.gobblin.service.monitoring.JobStatus;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.spy;
+import static org.powermock.reflect.Whitebox.setInternalState;
 
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EventSubmitter.class)
 public class KillDagProcTest {
   private MySqlDagManagementStateStore dagManagementStateStore;
   private ITestMetastoreDatabase testDb;
   private DagProcessingEngineMetrics mockedDagProcEngineMetrics;
+  private MockedStatic<DagProc> dagProc;
+  private EventSubmitter mockedEventSubmitter;
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -72,6 +91,13 @@ public class KillDagProcTest {
     this.dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testDb));
     LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
     this.mockedDagProcEngineMetrics = 
Mockito.mock(DagProcessingEngineMetrics.class);
+    this.dagProc = mockStatic(DagProc.class);
+  }
+
+  @BeforeMethod
+  public void resetMocks() {
+    this.mockedEventSubmitter = spy(new 
EventSubmitter.Builder(RootMetricContext.get(), 
"org.apache.gobblin.service").build());
+    setInternalState(DagProc.class, "eventSubmitter", 
this.mockedEventSubmitter);
   }
 
   @AfterClass(alwaysRun = true)
@@ -80,8 +106,11 @@ public class KillDagProcTest {
       // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
       this.testDb.close();
     }
+    this.dagProc.close();
   }
 
+  // launches the flow, submits first job, and then kills the dag.
+  // all the jobs are killed and first job that was already launched is killed 
with the job future object.
   @Test
   public void killDag() throws IOException, URISyntaxException, 
InterruptedException {
     long flowExecutionId = System.currentTimeMillis();
@@ -112,15 +141,35 @@ public class KillDagProcTest {
         null, this.dagManagementStateStore, mockedDagProcEngineMetrics), 
ConfigFactory.empty());
     killDagProc.process(this.dagManagementStateStore, 
this.mockedDagProcEngineMetrics);
 
-    long cancelJobCount = specProducers.stream()
+    int numOfLaunchedJobs = 1;
+    int numOfCancelledJobs = 5; // all jobs in the dag
+    int numOfCancelledFlows = 1;
+    int numOfCancelledJobsWithJobFuture = numOfLaunchedJobs;
+    long actualCancelJobCount = specProducers.stream()
         .mapToLong(p -> Mockito.mockingDetails(p)
             .getInvocations()
             .stream()
             .filter(a -> a.getMethod().getName().equals("cancelJob"))
             .count())
         .sum();
+
+    // kill dag procs kill only the launched jobs with parameters containing 
jobFuture
+    Mockito.verify(specProducers.get(0), 
Mockito.times(numOfCancelledJobsWithJobFuture)).cancelJob(any(), argThat(props 
->
+        props.getProperty(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
"ABSENT").equals(MockedSpecExecutor.dummySerializedFuture)));
+
+    // job future object is not available for rest of the jobs cancel 
parameters
+    specProducers.stream()
+        .skip(numOfCancelledJobsWithJobFuture)  // separately verified 
`specProducers.get(0)` above
+        .forEach(sp -> Mockito.verify(sp, Mockito.never()).cancelJob(any(), 
argThat(props ->
+            
props.getProperty(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
"ABSENT").equals(MockedSpecExecutor.dummySerializedFuture))));
+
     // kill dag proc tries to cancel all the dag nodes
-    Assert.assertEquals(cancelJobCount, 5);
+    Assert.assertEquals(actualCancelJobCount, numOfCancelledJobs);
+
+    Mockito.verify(this.mockedEventSubmitter, 
Mockito.times(numOfCancelledJobs))
+        .submit(eq(TimingEvent.LauncherTimings.JOB_CANCEL), anyMap());
+    Mockito.verify(this.mockedEventSubmitter, 
Mockito.times(numOfCancelledFlows))
+        .submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap());
   }
 
   @Test
@@ -159,14 +208,23 @@ public class KillDagProcTest {
         null, this.dagManagementStateStore, this.mockedDagProcEngineMetrics), 
ConfigFactory.empty());
     killDagProc.process(this.dagManagementStateStore, 
this.mockedDagProcEngineMetrics);
 
+    int numOfCancelledJobs = 1; // the only job that was cancelled
+    int numOfCancelledFlows = 1;
     long cancelJobCount = specProducers.stream()
         .mapToLong(p -> Mockito.mockingDetails(p)
             .getInvocations()
             .stream()
             .filter(a -> a.getMethod().getName().equals("cancelJob"))
+            .filter(a -> ((Properties) a.getArgument(1))
+                
.getProperty(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE).equals(MockedSpecExecutor.dummySerializedFuture))
             .count())
         .sum();
     // kill dag proc tries to cancel only the exact dag node that was provided
-    Assert.assertEquals(cancelJobCount, 1);
+    Assert.assertEquals(cancelJobCount, numOfCancelledJobs);
+
+    Mockito.verify(this.mockedEventSubmitter, 
Mockito.times(numOfCancelledJobs))
+        .submit(eq(TimingEvent.LauncherTimings.JOB_CANCEL), anyMap());
+    Mockito.verify(this.mockedEventSubmitter, 
Mockito.times(numOfCancelledFlows))
+        .submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap());
   }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 45caa295ea..99caa9a005 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -24,7 +24,11 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
+import org.junit.runner.RunWith;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -38,6 +42,9 @@ import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
@@ -65,20 +72,23 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.*;
+import static org.powermock.reflect.Whitebox.setInternalState;
 
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EventSubmitter.class)
 public class LaunchDagProcTest {
   private ITestMetastoreDatabase testMetastoreDatabase;
   private MySqlDagManagementStateStore dagManagementStateStore;
   private DagProcessingEngineMetrics mockedDagProcEngineMetrics;
+  private MockedStatic<DagProc> dagProc;
+  private EventSubmitter mockedEventSubmitter;
 
   @BeforeClass
   public void setUp() throws Exception {
     this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+    this.dagProc = mockStatic(DagProc.class);
   }
 
   /**
@@ -89,12 +99,15 @@ public class LaunchDagProcTest {
     this.dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
     mockDMSSCommonBehavior(this.dagManagementStateStore);
     this.mockedDagProcEngineMetrics = 
Mockito.mock(DagProcessingEngineMetrics.class);
+    this.mockedEventSubmitter = spy(new 
EventSubmitter.Builder(RootMetricContext.get(), 
"org.apache.gobblin.service").build());
+    setInternalState(DagProc.class, "eventSubmitter", 
this.mockedEventSubmitter);
   }
 
   @AfterClass(alwaysRun = true)
   public void tearDown() throws Exception {
     // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
     this.testMetastoreDatabase.close();
+    this.dagProc.close();
   }
 
   @Test
@@ -127,6 +140,10 @@ public class LaunchDagProcTest {
 
     Mockito.verify(this.dagManagementStateStore, 
Mockito.times(numOfLaunchedJobs))
         .addJobDagAction(any(), any(), anyLong(), 
eq(DagActionStore.NO_JOB_NAME_DEFAULT), 
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
+
+    // FLOW_RUNNING is emitted exactly once per flow during the execution of 
LaunchDagProc
+    Mockito.verify(this.mockedEventSubmitter, Mockito.times(1))
+        .submit(eq(TimingEvent.FlowTimings.FLOW_RUNNING), anyMap());
   }
 
   @Test
@@ -153,6 +170,10 @@ public class LaunchDagProcTest {
     // parallel jobs are launched through reevaluate dag action
     Mockito.verify(this.dagManagementStateStore, 
Mockito.times(numOfLaunchedJobs))
         .addJobDagAction(eq(flowGroup), eq(flowName), eq(flowExecutionId), 
any(), eq(DagActionStore.DagActionType.REEVALUATE));
+
+    // FLOW_RUNNING is emitted exactly once per flow during the execution of 
LaunchDagProc
+    Mockito.verify(this.mockedEventSubmitter, Mockito.times(1))
+        .submit(eq(TimingEvent.FlowTimings.FLOW_RUNNING), anyMap());
   }
 
   // This creates a dag like this
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index 3847f9fc06..966ef824d7 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -204,6 +204,8 @@ ext.externalDependency = [
     'parquetAvro': 'org.apache.parquet:parquet-avro:1.11.0',
     'parquetProto': 'org.apache.parquet:parquet-protobuf:1.11.0',
     'parquetHadoop': 'org.apache.parquet:parquet-hadoop-bundle:1.11.0',
+    'powerMockApi' : 'org.powermock:powermock-api-mockito2:2.0.9',
+    'powerMockModule' : 'org.powermock:powermock-module-junit4:2.0.9',
     'reactivex': 'io.reactivex.rxjava2:rxjava:2.1.0',
     "slf4j": [
         "org.slf4j:slf4j-api:" + slf4jVersion,

Reply via email to