This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b8be8f63a [GOBBLIN-1784] Only clean dags from the dag manager if a
flow event is received (#3641)
b8be8f63a is described below
commit b8be8f63a7b8d6b5d1feaf9e47f022cded2ac96b
Author: William Lo <[email protected]>
AuthorDate: Thu Feb 16 12:19:19 2023 -0800
[GOBBLIN-1784] Only clean dags from the dag manager if a flow event is
received (#3641)
* Only clean dags from the dag manager if a flow event is received
* Cleanup
* Fix DagManagerFlowTest with cleanup changes
* Address comments with more accurate logs, fix a bug where retries should
be resetting the event to be emitted
---
.../gobblin/service/modules/flowgraph/Dag.java | 4 +-
.../service/modules/orchestration/DagManager.java | 125 ++++----
.../modules/orchestration/DagManagerMetrics.java | 14 +-
.../modules/orchestration/DagManagerUtils.java | 6 +-
.../modules/orchestration/DagManagerFlowTest.java | 23 +-
.../modules/orchestration/DagManagerTest.java | 339 ++++++++++++++-------
6 files changed, 326 insertions(+), 185 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index 40a56bc25..edb19d8d1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -35,7 +35,6 @@ import lombok.Setter;
import org.apache.gobblin.annotation.Alpha;
-
/**
* An implementation of Dag. Assumes that nodes have unique values. Nodes with
duplicate values will produce
* unpredictable behavior.
@@ -53,6 +52,9 @@ public class Dag<T> {
private String message;
@Setter
private String flowEvent;
+ // Keep track of when the final flow status is emitted, in milliseconds to
avoid many duplicate events
+ @Setter @Getter
+ private long eventEmittedTimeMillis = -1;
public Dag(List<DagNode<T>> dagNodes) {
this.nodes = dagNodes;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 6f26dc3e5..6251ac1d2 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -17,10 +17,6 @@
package org.apache.gobblin.service.modules.orchestration;
-import com.codahale.metrics.Meter;
-import com.google.common.base.Joiner;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@@ -43,15 +39,19 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
+import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigException;
+import com.typesafe.config.ConfigFactory;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@@ -77,6 +77,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KillFlowEvent;
@@ -129,6 +130,8 @@ public class DagManager extends AbstractIdleService {
private static final String DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT = "DAYS";
private static final String FAILED_DAG_RETENTION_TIME =
FAILED_DAG_STATESTORE_PREFIX + ".retention.time";
private static final long DEFAULT_FAILED_DAG_RETENTION_TIME = 7L;
+ // Re-emit the final flow status if not detected within 5 minutes
+ private static final long DAG_FLOW_STATUS_TOLERANCE_TIME_MILLIS =
TimeUnit.MINUTES.toMillis(5);
public static final String FAILED_DAG_POLLING_INTERVAL =
FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes";
public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
public static final String DAG_MANAGER_HEARTBEAT =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".dagManager.heartbeat-%s";
@@ -477,9 +480,8 @@ public class DagManager extends AbstractIdleService {
// dagToJobs holds a map of dagId to running jobs of that dag
final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new
HashMap<>();
final Map<String, Long> dagToSLA = new HashMap<>();
- private final Set<String> failedDagIdsFinishRunning = new HashSet<>();
- private final Set<String> failedDagIdsFinishAllPossible = new HashSet<>();
private final MetricContext metricContext;
+ private final Set<String> dagIdstoClean = new HashSet<>();
private final Optional<EventSubmitter> eventSubmitter;
private final Optional<Timer> jobStatusPolledTimer;
private final AtomicLong orchestrationDelay = new AtomicLong(0);
@@ -760,7 +762,6 @@ public class DagManager extends AbstractIdleService {
* Proceed the execution of each dag node based on job status.
*/
private void pollAndAdvanceDag() throws IOException, ExecutionException,
InterruptedException {
- this.failedDagIdsFinishRunning.clear();
Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted =
Maps.newHashMap();
List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
@@ -806,6 +807,7 @@ public class DagManager extends AbstractIdleService {
if (jobStatus != null && jobStatus.isShouldRetry()) {
log.info("Retrying job: {}, current attempts: {}, max attempts:
{}", DagManagerUtils.getFullyQualifiedJobName(node),
jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
+ this.jobToDag.get(node).setFlowEvent(null);
submitJob(node);
}
} catch (Exception e) {
@@ -1048,9 +1050,6 @@ public class DagManager extends AbstractIdleService {
/**
* Method that defines the actions to be performed when a job finishes
either successfully or with failure.
* This method updates the state of the dag and performs clean up actions
as necessary.
- * TODO : Dag should have a status field, like JobExecutionPlan has. This
method should update that field,
- * which should be used by cleanup(). It may also remove the need
of failedDagIdsFinishRunning,
- * failedDagIdsFinishAllPossible.
*/
private Map<String, Set<DagNode<JobExecutionPlan>>>
onJobFinish(DagNode<JobExecutionPlan> dagNode)
throws IOException {
@@ -1067,19 +1066,11 @@ public class DagManager extends AbstractIdleService {
switch (jobStatus) {
case FAILED:
dag.setMessage("Flow failed because job " + jobName + " failed");
- if (DagManagerUtils.getFailureOption(dag) ==
FailureOption.FINISH_RUNNING) {
- this.failedDagIdsFinishRunning.add(dagId);
- } else {
- this.failedDagIdsFinishAllPossible.add(dagId);
- }
+ dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED);
dagManagerMetrics.incrementExecutorFailed(dagNode);
return Maps.newHashMap();
case CANCELLED:
- if (DagManagerUtils.getFailureOption(dag) ==
FailureOption.FINISH_RUNNING) {
- this.failedDagIdsFinishRunning.add(dagId);
- } else {
- this.failedDagIdsFinishAllPossible.add(dagId);
- }
+ dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
return Maps.newHashMap();
case COMPLETE:
dagManagerMetrics.incrementExecutorSuccess(dagNode);
@@ -1117,46 +1108,73 @@ public class DagManager extends AbstractIdleService {
* Perform clean up. Remove a dag from the dagstore if the dag is complete
and update internal state.
*/
private void cleanUp() {
- List<String> dagIdstoClean = new ArrayList<>();
- //Clean up failed dags
- for (String dagId : this.failedDagIdsFinishRunning) {
- //Skip monitoring of any other jobs of the failed dag.
- LinkedList<DagNode<JobExecutionPlan>> dagNodeList =
this.dagToJobs.get(dagId);
- while (!dagNodeList.isEmpty()) {
- DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
- deleteJobState(dagId, dagNode);
- }
- Dag<JobExecutionPlan> dag = this.dags.get(dagId);
- String status = TimingEvent.FlowTimings.FLOW_FAILED;
- addFailedDag(dagId, dag);
- log.info("Dag {} has finished with status {}; Cleaning up dag from the
state store.", dagId, status);
- // send an event before cleaning up dag
- DagManagerUtils.emitFlowEvent(this.eventSubmitter,
this.dags.get(dagId), status);
- dagIdstoClean.add(dagId);
- }
+ // Approximate the time when the flow events are emitted to account for
delay when the flow event is received by the job monitor
+ long cleanUpProcessingTime = System.currentTimeMillis();
// Remove dags that are finished and emit their appropriate metrics
for (Map.Entry<String, Dag<JobExecutionPlan>> dagIdKeyPair :
this.dags.entrySet()) {
String dagId = dagIdKeyPair.getKey();
+ // On service restart, we repopulate the dags that are waiting to be
cleaned up
+ if (dagIdstoClean.contains(dagId)) {
+ continue;
+ }
Dag<JobExecutionPlan> dag = dagIdKeyPair.getValue();
- if (!hasRunningJobs(dagId) &&
!this.failedDagIdsFinishRunning.contains(dagId)) {
- String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
- if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
- status = TimingEvent.FlowTimings.FLOW_FAILED;
- addFailedDag(dagId, dag);
- this.failedDagIdsFinishAllPossible.remove(dagId);
+ if ((TimingEvent.FlowTimings.FLOW_FAILED.equals(dag.getFlowEvent()) ||
TimingEvent.FlowTimings.FLOW_CANCELLED.equals(dag.getFlowEvent())) &&
+ DagManagerUtils.getFailureOption(dag) ==
FailureOption.FINISH_RUNNING) {
+ //Skip monitoring of any other jobs of the failed dag.
+ LinkedList<DagNode<JobExecutionPlan>> dagNodeList =
this.dagToJobs.get(dagId);
+ while (!dagNodeList.isEmpty()) {
+ DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
+ deleteJobState(dagId, dagNode);
+ }
+ }
+ if (!hasRunningJobs(dagId)) {
+ // Collect all the dagIds that are finished
+ this.dagIdstoClean.add(dagId);
+ if (dag.getFlowEvent() == null) {
+ // If the dag flow event is not set, then it is successful
+ dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_SUCCEEDED);
} else {
-
dagManagerMetrics.emitFlowSuccessMetrics(DagManagerUtils.getFlowId(this.dags.get(dagId)));
+ addFailedDag(dagId, dag);
}
- log.info("Dag {} has finished with status {}; Cleaning up dag from
the state store.", dagId, status);
// send an event before cleaning up dag
- DagManagerUtils.emitFlowEvent(this.eventSubmitter,
this.dags.get(dagId), status);
- dagIdstoClean.add(dagId);
+ DagManagerUtils.emitFlowEvent(this.eventSubmitter,
this.dags.get(dagId), dag.getFlowEvent());
+ dag.setEventEmittedTimeMillis(cleanUpProcessingTime);
}
}
- for (String dagId: dagIdstoClean) {
- cleanUpDag(dagId);
+ // Only clean up dags after the job status monitor processed the flow
event
+ for (Iterator<String> dagIdIterator = this.dagIdstoClean.iterator();
dagIdIterator.hasNext();) {
+ String dagId = dagIdIterator.next();
+ Dag<JobExecutionPlan> dag = this.dags.get(dagId);
+ JobStatus flowStatus = pollFlowStatus(dag);
+ if (flowStatus != null &&
FlowStatusGenerator.FINISHED_STATUSES.contains(flowStatus.getEventName())) {
+ FlowId flowId = DagManagerUtils.getFlowId(dag);
+ switch(dag.getFlowEvent()) {
+ case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
+ this.dagManagerMetrics.emitFlowSuccessMetrics(flowId);
+ this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId,
FlowState.SUCCESSFUL);
+ break;
+ case TimingEvent.FlowTimings.FLOW_FAILED:
+ this.dagManagerMetrics.emitFlowFailedMetrics(flowId);
+ this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId,
FlowState.FAILED);
+ break;
+ case TimingEvent.FlowTimings.FLOW_CANCELLED:
+ this.dagManagerMetrics.emitFlowSlaExceededMetrics(flowId);
+ this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId,
FlowState.FAILED);
+ break;
+ default:
+ log.warn("Unexpected flow event {} for dag {}",
dag.getFlowEvent(), dagId);
+ }
+ log.info("Dag {} has finished with status {}; Cleaning up dag from
the state store.", dagId, dag.getFlowEvent());
+ cleanUpDag(dagId);
+ dagIdIterator.remove();
+ } else if (cleanUpProcessingTime > dag.getEventEmittedTimeMillis() +
DAG_FLOW_STATUS_TOLERANCE_TIME_MILLIS) {
+ // Re-emit the flow event if the flow status has not been processed
within the DagFlowStatusTolerance time
+ DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag,
dag.getFlowEvent());
+ } else {
+ log.info("Waiting for flow event {} to be emitted before cleaning up
dag {}", dag.getFlowEvent(), dagId);
+ }
}
}
@@ -1164,7 +1182,6 @@ public class DagManager extends AbstractIdleService {
* Add a dag to failed dag state store
*/
private synchronized void addFailedDag(String dagId, Dag<JobExecutionPlan>
dag) {
- FlowId flowId = DagManagerUtils.getFlowId(dag);
try {
log.info("Adding dag " + dagId + " to failed dag state store");
this.failedDagStateStore.writeCheckpoint(this.dags.get(dagId));
@@ -1172,12 +1189,6 @@ public class DagManager extends AbstractIdleService {
log.error("Failed to add dag " + dagId + " to failed dag state store",
e);
}
this.failedDagIds.add(dagId);
- if
(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED.equals(dag.getFlowEvent()))
{
- this.dagManagerMetrics.emitFlowSlaExceededMetrics(flowId);
- } else if
(!TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED.equals(dag.getFlowEvent()))
{
- dagManagerMetrics.emitFlowFailedMetrics(flowId);
- }
- this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId,
DagManager.FlowState.FAILED);
}
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
index a81bf39b1..e9e605b06 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
@@ -17,16 +17,20 @@
package org.apache.gobblin.service.modules.orchestration;
-import com.codahale.metrics.MetricRegistry;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
+
import org.apache.commons.lang3.StringUtils;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 3dd0fb592..152dca00d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -16,8 +16,6 @@
*/
package org.apache.gobblin.service.modules.orchestration;
-import com.google.common.collect.ImmutableMap;
-import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
@@ -28,12 +26,14 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
-import java.util.stream.Collectors;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 9366b7a8a..ecb84d839 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.service.modules.orchestration;
-import com.google.common.base.Optional;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
@@ -26,26 +25,27 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.gobblin.config.ConfigBuilder;
-import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
-import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import javax.annotation.Nullable;
+import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -168,6 +168,17 @@ public class DagManagerFlowTest {
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow2",
"group2",
flowExecutionId3, "job0", "group2");
+ Mockito.doReturn(DagManagerTest.getMockFlowStatus("flow0", "group0",
flowExecutionId1, String.valueOf(ExecutionStatus.CANCELLED)))
+
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow0",
"group0",
+ flowExecutionId1, JobStatusRetriever.NA_KEY,
JobStatusRetriever.NA_KEY);
+
+ Mockito.doReturn(DagManagerTest.getMockFlowStatus("flow1", "group1",
flowExecutionId2, String.valueOf(ExecutionStatus.CANCELLED)))
+
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow1",
"group1",
+ flowExecutionId2, JobStatusRetriever.NA_KEY,
JobStatusRetriever.NA_KEY);
+
+ Mockito.doReturn(DagManagerTest.getMockFlowStatus("flow2", "group2",
flowExecutionId3, String.valueOf(ExecutionStatus.CANCELLED)))
+
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow2",
"group2",
+ flowExecutionId3, JobStatusRetriever.NA_KEY,
JobStatusRetriever.NA_KEY);
// check removal of dag in dagToJobs map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
assertTrue(input ->
!dagManager.dagManagerThreads[queue1].dagToJobs.containsKey(dagId1),
ERROR_MESSAGE);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 655c4f895..0a572cf26 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -16,9 +16,6 @@
*/
package org.apache.gobblin.service.modules.orchestration;
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.MetricRegistry;
-import com.google.common.base.Optional;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -38,13 +35,15 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.io.FileUtils;
-import org.apache.gobblin.metrics.ServiceMetricNames;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
@@ -55,6 +54,7 @@ import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
@@ -124,7 +124,6 @@ public class DagManagerTest {
failedDagIdsField.setAccessible(true);
this.failedDagIds = (Set<String>)
failedDagIdsField.get(this._dagManagerThread);
}
-
/**
* Create a list of dags with only one node each
* @return a Dag.
@@ -185,6 +184,10 @@ public class DagManagerTest {
return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
}
+ static Iterator<JobStatus> getMockFlowStatus(String flowName, String
flowGroup, Long flowExecutionId, String eventName) {
+ return getMockJobStatus(flowName, flowGroup, flowExecutionId,
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, eventName);
+ }
+
static Iterator<JobStatus> getMockJobStatus(String flowName, String
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String
eventName) {
return getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup,
jobName, eventName, false, flowExecutionId + 10);
}
@@ -219,6 +222,7 @@ public class DagManagerTest {
Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName1, flowGroup,
String.valueOf(ExecutionStatus.RUNNING));
Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName2, flowGroup,
String.valueOf(ExecutionStatus.COMPLETE));
Iterator<JobStatus> jobStatusIterator7 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName1, flowGroup,
String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator8 = getMockFlowStatus(flowName,
flowGroup, flowExecutionId, String.valueOf(ExecutionStatus.COMPLETE));
Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(),
Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
@@ -228,7 +232,8 @@ public class DagManagerTest {
thenReturn(jobStatusIterator4).
thenReturn(jobStatusIterator5).
thenReturn(jobStatusIterator6).
- thenReturn(jobStatusIterator7);
+ thenReturn(jobStatusIterator7).
+ thenReturn(jobStatusIterator8);
//Run the thread once. Ensure the first job is running
this._dagManagerThread.run();
@@ -311,6 +316,7 @@ public class DagManagerTest {
getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1,
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
Iterator<JobStatus> jobStatusIterator9 =
getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator10 = getMockFlowStatus(flowName,
flowGroup, flowExecutionId, String.valueOf(ExecutionStatus.FAILED));
Mockito.when(_jobStatusRetriever
@@ -323,7 +329,8 @@ public class DagManagerTest {
thenReturn(jobStatusIterator6).
thenReturn(jobStatusIterator7).
thenReturn(jobStatusIterator8).
- thenReturn(jobStatusIterator9);
+ thenReturn(jobStatusIterator9).
+ thenReturn(jobStatusIterator10);
//Run the thread once. Ensure the first job is running
this._dagManagerThread.run();
@@ -410,17 +417,21 @@ public class DagManagerTest {
Iterator<JobStatus> jobStatusIterator6 =
getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.FAILED));
Iterator<JobStatus> jobStatusIterator7 =
- getMockJobStatus(flowName, flowGroup, flowExecutionId, "NA_KEY",
"NA_KEY", String.valueOf(ExecutionStatus.PENDING_RESUME));
+ getMockFlowStatus(flowName, flowGroup, flowExecutionId,
String.valueOf(ExecutionStatus.FAILED));
Iterator<JobStatus> jobStatusIterator8 =
- getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+ getMockFlowStatus(flowName, flowGroup, flowExecutionId,
String.valueOf(ExecutionStatus.PENDING_RESUME));
Iterator<JobStatus> jobStatusIterator9 =
- getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
Iterator<JobStatus> jobStatusIterator10 =
- getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.PENDING_RESUME));
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
Iterator<JobStatus> jobStatusIterator11 =
- getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.PENDING_RESUME));
Iterator<JobStatus> jobStatusIterator12 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator13 =
getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator14 =
+ getMockFlowStatus(flowName, flowGroup, flowExecutionId,
String.valueOf(ExecutionStatus.COMPLETE));
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
@@ -435,7 +446,9 @@ public class DagManagerTest {
thenReturn(jobStatusIterator9).
thenReturn(jobStatusIterator10).
thenReturn(jobStatusIterator11).
- thenReturn(jobStatusIterator12);
+ thenReturn(jobStatusIterator12).
+ thenReturn(jobStatusIterator13).
+ thenReturn(jobStatusIterator14);
// Run thread until job2 fails
for (int i = 0; i < 4; i++) {
@@ -483,6 +496,7 @@ public class DagManagerTest {
Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName2, flowGroup,
String.valueOf(ExecutionStatus.RUNNING));
Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName1, flowGroup,
String.valueOf(ExecutionStatus.COMPLETE));
Iterator<JobStatus> jobStatusIterator7 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName2, flowGroup,
String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator8 = getMockFlowStatus(flowName,
flowGroup, flowExecutionId, String.valueOf(ExecutionStatus.COMPLETE));
Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(),
Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
@@ -492,7 +506,8 @@ public class DagManagerTest {
thenReturn(jobStatusIterator4).
thenReturn(jobStatusIterator5).
thenReturn(jobStatusIterator6).
- thenReturn(jobStatusIterator7);
+ thenReturn(jobStatusIterator7).
+ thenReturn(jobStatusIterator8);
//Run the thread once. Ensure the first job is running
this._dagManagerThread.run();
@@ -562,7 +577,7 @@ public class DagManagerTest {
Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.PENDING_RETRY), true);
Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.RUNNING));
Iterator<JobStatus> jobStatusIterator7 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.FAILED));
-
+ Iterator<JobStatus> jobStatusIterator8 = getMockFlowStatus(flowName,
flowGroup, flowExecutionId, String.valueOf(ExecutionStatus.FAILED));
Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(),
Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
@@ -572,7 +587,8 @@ public class DagManagerTest {
thenReturn(jobStatusIterator4).
thenReturn(jobStatusIterator5).
thenReturn(jobStatusIterator6).
- thenReturn(jobStatusIterator7);
+ thenReturn(jobStatusIterator7).
+ thenReturn(jobStatusIterator8);
// Run 4 times, first job fails every time and is retried
for (int i = 0; i < 4; i++) {
@@ -633,17 +649,21 @@ public class DagManagerTest {
Iterator<JobStatus> jobStatusIterator6 =
getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.CANCELLED));
Iterator<JobStatus> jobStatusIterator7 =
- getMockJobStatus(flowName, flowGroup, flowExecutionId, "NA_KEY",
"NA_KEY", String.valueOf(ExecutionStatus.PENDING_RESUME));
- Iterator<JobStatus> jobStatusIterator8 =
- getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+ getMockFlowStatus(flowName, flowGroup, flowExecutionId,
String.valueOf(ExecutionStatus.CANCELLED));
+ Iterator<JobStatus> jobStatusIterator8 =
+ getMockFlowStatus(flowName, flowGroup, flowExecutionId,
String.valueOf(ExecutionStatus.PENDING_RESUME));
Iterator<JobStatus> jobStatusIterator9 =
- getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
Iterator<JobStatus> jobStatusIterator10 =
- getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.PENDING_RESUME));
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
Iterator<JobStatus> jobStatusIterator11 =
- getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.PENDING_RESUME));
Iterator<JobStatus> jobStatusIterator12 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator13 =
getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator14 = getMockFlowStatus(flowName,
flowGroup, flowExecutionId, String.valueOf(ExecutionStatus.COMPLETE));
+
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
@@ -658,7 +678,9 @@ public class DagManagerTest {
thenReturn(jobStatusIterator9).
thenReturn(jobStatusIterator10).
thenReturn(jobStatusIterator11).
- thenReturn(jobStatusIterator12);
+ thenReturn(jobStatusIterator12).
+ thenReturn(jobStatusIterator13).
+ thenReturn(jobStatusIterator14);
// Run until job2 cancelled
for (int i = 0; i < 3; i++) {
@@ -706,27 +728,43 @@ public class DagManagerTest {
//Add a dag to the queue of dags
this.queue.offer(dag);
// The start time should be 16 minutes ago, which is past the start SLA so
the job should be cancelled
- Iterator<JobStatus> jobStatusIterator1 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_0 =
getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0,
flowGroup, String.valueOf(ExecutionStatus.ORCHESTRATED),
false, flowExecutionId - Duration.ofMinutes(16).toMillis());
// This is for the second Dag that does not match the SLA so should
schedule normally
- Iterator<JobStatus> jobStatusIterator2 =
+ Iterator<JobStatus> jobStatusIteratorFlow1_0 =
getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, jobName0,
flowGroup1, String.valueOf(ExecutionStatus.ORCHESTRATED),
false, flowExecutionId - Duration.ofMinutes(10).toMillis());
// Let the first job get reported as cancel due to SLA kill on start and
clean up
- Iterator<JobStatus> jobStatusIterator3 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_1 =
getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0,
flowGroup, String.valueOf(ExecutionStatus.CANCELLED),
false, flowExecutionId - Duration.ofMinutes(16).toMillis());
+ Iterator<JobStatus> jobStatusIteratorFlow0_2 =
+ getMockFlowStatus(flowName, flowGroup, flowExecutionId,
String.valueOf(ExecutionStatus.CANCELLED));
// Cleanup the running job that is scheduled normally
- Iterator<JobStatus> jobStatusIterator4 =
+ Iterator<JobStatus> jobStatusIteratorFlow1_1 =
getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, jobName0,
flowGroup1, String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIteratorFlow1_2 =
+ getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, "job1",
flowGroup1, String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIteratorFlow1_3 =
+ getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, "job2",
flowGroup1, String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIteratorFlow1_4 =
+ getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, "job2",
flowGroup1, String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIteratorFlow1_5 =
+ getMockFlowStatus(flowName1, flowGroup1, flowExecutionId+1,
String.valueOf(ExecutionStatus.COMPLETE));
Mockito.when(_jobStatusRetriever
- .getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
- thenReturn(jobStatusIterator1).
- thenReturn(jobStatusIterator2).
- thenReturn(jobStatusIterator3).
- thenReturn(jobStatusIterator4);
+ .getJobStatusesForFlowExecution(Mockito.eq("flow0"),
Mockito.eq("group0"), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
+ thenReturn(jobStatusIteratorFlow0_0).
+ thenReturn(jobStatusIteratorFlow0_1).
+ thenReturn(jobStatusIteratorFlow0_2);
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.eq("flow1"),
Mockito.eq("group1"), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
+ thenReturn(jobStatusIteratorFlow1_0).
+ thenReturn(jobStatusIteratorFlow1_1).
+ thenReturn(jobStatusIteratorFlow1_2).
+
thenReturn(jobStatusIteratorFlow1_3).thenReturn(jobStatusIteratorFlow1_4).thenReturn(jobStatusIteratorFlow1_5);
// Run the thread once. Ensure the first job is running
this._dagManagerThread.run();
@@ -748,6 +786,7 @@ public class DagManagerTest {
// Cleanup
this._dagManagerThread.run();
this._dagManagerThread.run();
+ this._dagManagerThread.run();
Assert.assertEquals(this.dags.size(), 0);
Assert.assertEquals(this.jobToDag.size(), 0);
Assert.assertEquals(this.dagToJobs.size(), 0);
@@ -782,12 +821,20 @@ public class DagManagerTest {
Iterator<JobStatus> jobStatusIterator3 =
getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0",
"group2", String.valueOf(ExecutionStatus.ORCHESTRATED),
false, startOrchestrationTime);
-
+ Iterator<JobStatus> jobStatusIterator4 =
+ getMockFlowStatus("flow2", "flow2", flowExecutionId+1,
String.valueOf(ExecutionStatus.CANCELLED));
+ Iterator<JobStatus> jobStatusIterator5 =
+ getMockFlowStatus("flow2", "flow2", flowExecutionId+1,
String.valueOf(ExecutionStatus.CANCELLED));
+ Iterator<JobStatus> jobStatusIterator6 =
+ getMockFlowStatus("flow2", "flow2", flowExecutionId+1,
String.valueOf(ExecutionStatus.CANCELLED));
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
thenReturn(jobStatusIterator1).
thenReturn(jobStatusIterator2).
- thenReturn(jobStatusIterator3);
+ thenReturn(jobStatusIterator3).
+ thenReturn(jobStatusIterator4).
+ thenReturn(jobStatusIterator5).
+ thenReturn(jobStatusIterator6);
// Run the thread once. All 3 jobs should be emitted an SLA exceeded event
this._dagManagerThread.run();
@@ -836,10 +883,13 @@ public class DagManagerTest {
// Job should have been run normally without breaking on SLA check, so we
can just mark as completed for status
Iterator<JobStatus> jobStatusIterator1 =
getMockJobStatus(flowName, flowGroup, flowExecutionId+1, jobName,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator2 =
+ getMockFlowStatus(flowName, flowGroup, flowExecutionId+1,
String.valueOf(ExecutionStatus.COMPLETE));
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
- thenReturn(jobStatusIterator1);
+ thenReturn(jobStatusIterator1).
+ thenReturn(jobStatusIterator2);
// Run the thread once. Job should run without crashing thread on SLA
check and cleanup
this._dagManagerThread.run();
@@ -855,31 +905,36 @@ public class DagManagerTest {
this.queue.offer(dagList.get(0));
Config jobConfig0 =
dagList.get(0).getNodes().get(0).getValue().getJobSpec().getConfig();
Config jobConfig1 =
dagList.get(1).getNodes().get(0).getValue().getJobSpec().getConfig();
- Iterator<JobStatus> jobStatusIterator0 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_0 =
getMockJobStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
"job0", "group0", String.valueOf(ExecutionStatus.RUNNING));
- Iterator<JobStatus> jobStatusIterator1 =
+ Iterator<JobStatus> jobStatusIteratorFlow1_0 =
getMockJobStatus("flow1", "group1",
Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
"job0", "group1", String.valueOf(ExecutionStatus.FAILED));
+ Iterator<JobStatus> jobStatusIteratorFlow1_1 =
+ getMockFlowStatus("flow1", "group1",
Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
String.valueOf(ExecutionStatus.FAILED));
// Cleanup the running job that is scheduled normally
- Iterator<JobStatus> jobStatusIterator2 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_1 =
getMockJobStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
"job0", "group0", String.valueOf(ExecutionStatus.RUNNING));
- Iterator<JobStatus> jobStatusIterator3 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_2 =
getMockJobStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
"job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
-
+ Iterator<JobStatus> jobStatusIteratorFlow0_3 =
+ getMockFlowStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
String.valueOf(ExecutionStatus.COMPLETE));
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.eq("flow0"),
Mockito.eq("group0"), Mockito.anyLong(),
Mockito.anyString(), Mockito.anyString()))
- .thenReturn(jobStatusIterator0)
- .thenReturn(jobStatusIterator2)
- .thenReturn(jobStatusIterator3);
+ .thenReturn(jobStatusIteratorFlow0_0)
+ .thenReturn(jobStatusIteratorFlow0_1)
+ .thenReturn(jobStatusIteratorFlow0_2)
+ .thenReturn(jobStatusIteratorFlow0_3);
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.eq("flow1"),
Mockito.eq("group1"), Mockito.anyLong(),
Mockito.anyString(), Mockito.anyString()))
- .thenReturn(jobStatusIterator1);
+ .thenReturn(jobStatusIteratorFlow1_0)
+ .thenReturn(jobStatusIteratorFlow1_1);
this._dagManagerThread.run();
// dag will not be processed due to exceeding the quota, will log a
message and exit out without adding it to dags
@@ -892,6 +947,10 @@ public class DagManagerTest {
"user")).getCount(), 1);
this._dagManagerThread.run(); // cleanup
+
+ Assert.assertEquals(this.dags.size(), 0);
+ Assert.assertEquals(this.jobToDag.size(), 0);
+ Assert.assertEquals(this.dagToJobs.size(), 0);
}
@Test (dependsOnMethods = "testDagManagerQuotaExceeded")
@@ -905,36 +964,44 @@ public class DagManagerTest {
Config jobConfig1 =
dagList.get(1).getNodes().get(0).getValue().getJobSpec().getConfig();
Config jobConfig2 =
dagList.get(1).getNodes().get(0).getValue().getJobSpec().getConfig();
- Iterator<JobStatus> jobStatusIterator0 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_0 =
getMockJobStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
"job0", "group0", String.valueOf(ExecutionStatus.RUNNING));
- Iterator<JobStatus> jobStatusIterator1 =
+ Iterator<JobStatus> jobStatusIteratorFlow1_0 =
getMockJobStatus("flow1", "group1",
Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
"job0", "group1", String.valueOf(ExecutionStatus.FAILED));
- Iterator<JobStatus> jobStatusIterator2 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_1 =
getMockJobStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
"job0", "group0", String.valueOf(ExecutionStatus.RUNNING));
- Iterator<JobStatus> jobStatusIterator3 =
- getMockJobStatus("flow2", "group2",
Long.valueOf(jobConfig2.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
- "job0", "group2", String.valueOf(ExecutionStatus.FAILED));
- Iterator<JobStatus> jobStatusIterator4 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_2 =
getMockJobStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
"job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
-
+ Iterator<JobStatus> jobStatusIteratorFlow2_0 =
+ getMockJobStatus("flow2", "group2",
Long.valueOf(jobConfig2.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+ "job0", "group2", String.valueOf(ExecutionStatus.FAILED));
+ Iterator<JobStatus> jobStatusIteratorFlow0_3 =
+ getMockFlowStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIteratorFlow1_1 =
+ getMockFlowStatus("flow1", "group2",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
String.valueOf(ExecutionStatus.FAILED));
+ Iterator<JobStatus> jobStatusIteratorFlow2_1 =
+ getMockFlowStatus("flow1", "group2",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
String.valueOf(ExecutionStatus.FAILED));
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.eq("flow0"),
Mockito.eq("group0"), Mockito.anyLong(),
Mockito.anyString(), Mockito.anyString()))
- .thenReturn(jobStatusIterator0)
- .thenReturn(jobStatusIterator2)
- .thenReturn(jobStatusIterator4);
+ .thenReturn(jobStatusIteratorFlow0_0)
+ .thenReturn(jobStatusIteratorFlow0_1)
+ .thenReturn(jobStatusIteratorFlow0_2)
+ .thenReturn(jobStatusIteratorFlow0_3);
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.eq("flow1"),
Mockito.eq("group1"), Mockito.anyLong(),
Mockito.anyString(), Mockito.anyString()))
- .thenReturn(jobStatusIterator1);
+ .thenReturn(jobStatusIteratorFlow1_0)
+ .thenReturn(jobStatusIteratorFlow1_1);
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.eq("flow2"),
Mockito.eq("group2"), Mockito.anyLong(),
Mockito.anyString(), Mockito.anyString()))
- .thenReturn(jobStatusIterator3);
+ .thenReturn(jobStatusIteratorFlow2_0)
+ .thenReturn(jobStatusIteratorFlow2_1);
this._dagManagerThread.run();
@@ -946,6 +1013,7 @@ public class DagManagerTest {
// Test case where a job that exceeded a quota would cause a double
decrement after fixing the proxy user name, allowing for more jobs to run
this.queue.offer(dagList.get(2));
this._dagManagerThread.run();
+
// Assert that running dag metrics are only counted once
Assert.assertEquals(allCounters.get(MetricRegistry.name(
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
@@ -958,6 +1026,9 @@ public class DagManagerTest {
ServiceMetricNames.SERVICE_USERS,
"user")).getCount(), 0);
+ Assert.assertEquals(this.dags.size(), 0);
+ Assert.assertEquals(this.jobToDag.size(), 0);
+ Assert.assertEquals(this.dagToJobs.size(), 0);
}
@Test (dependsOnMethods = "testQuotaDecrement")
@@ -967,39 +1038,44 @@ public class DagManagerTest {
this.queue.offer(dagList.get(0));
Config jobConfig0 =
dagList.get(0).getNodes().get(0).getValue().getJobSpec().getConfig();
Config jobConfig1 =
dagList.get(1).getNodes().get(0).getValue().getJobSpec().getConfig();
- Iterator<JobStatus> jobStatusIterator0 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_0 =
getMockJobStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
"job0", "group0", String.valueOf(ExecutionStatus.ORCHESTRATED),
true);
// Cleanup the running job that is scheduled normally
- Iterator<JobStatus> jobStatusIterator1 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_1 =
getMockJobStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
"job0", "group0", String.valueOf(ExecutionStatus.RUNNING), true);
- Iterator<JobStatus> jobStatusIterator2 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_2 =
getMockJobStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
"job0", "group0", String.valueOf(ExecutionStatus.ORCHESTRATED));
- Iterator<JobStatus> jobStatusIterator3 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_3 =
getMockJobStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
"job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
- Iterator<JobStatus> jobStatusIterator4 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_4 =
+ getMockFlowStatus("flow0", "group0",
Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIteratorFlow1_0 =
getMockJobStatus("flow1", "group1",
Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
"job0", "group1", String.valueOf(ExecutionStatus.ORCHESTRATED));
- Iterator<JobStatus> jobStatusIterator5 =
+ Iterator<JobStatus> jobStatusIteratorFlow1_1 =
getMockJobStatus("flow1", "group1",
Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
"job0", "group1", String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIteratorFlow1_2 =
+ getMockFlowStatus("flow1", "group1",
Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
String.valueOf(ExecutionStatus.COMPLETE));
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.eq("flow0"),
Mockito.eq("group0"), Mockito.anyLong(),
Mockito.anyString(), Mockito.anyString()))
- .thenReturn(jobStatusIterator0)
- .thenReturn(jobStatusIterator1)
- .thenReturn(jobStatusIterator2)
- .thenReturn(jobStatusIterator3);
+ .thenReturn(jobStatusIteratorFlow0_0)
+ .thenReturn(jobStatusIteratorFlow0_1)
+ .thenReturn(jobStatusIteratorFlow0_2)
+ .thenReturn(jobStatusIteratorFlow0_3)
+ .thenReturn(jobStatusIteratorFlow0_4);
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.eq("flow1"),
Mockito.eq("group1"), Mockito.anyLong(),
Mockito.anyString(), Mockito.anyString()))
- .thenReturn(jobStatusIterator4)
- .thenReturn(jobStatusIterator5);
-
+ .thenReturn(jobStatusIteratorFlow1_0)
+ .thenReturn(jobStatusIteratorFlow1_1)
+ .thenReturn(jobStatusIteratorFlow1_2);
// Dag1 is running
this._dagManagerThread.run();
SortedMap<String, Counter> allCounters =
metricContext.getParent().get().getCounters();
@@ -1021,6 +1097,13 @@ public class DagManagerTest {
this.queue.offer(dagList.get(1));
this._dagManagerThread.run();
this._dagManagerThread.run(); // cleanup
+ Assert.assertEquals(allCounters.get(MetricRegistry.name(
+ ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.SERVICE_USERS,
+ "user")).getCount(), 0);
+ Assert.assertEquals(this.dags.size(), 0);
+ Assert.assertEquals(this.jobToDag.size(), 0);
+ Assert.assertEquals(this.dagToJobs.size(), 0);
}
@Test (dependsOnMethods = "testQuotasRetryFlow")
@@ -1031,20 +1114,25 @@ public class DagManagerTest {
ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
false).build()); //Add a dag to the queue of dags
this.queue.offer(adhocDag);
- Iterator<JobStatus> jobStatusIterator1 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_0 =
getMockJobStatus("flow" + flowId, "group" + flowId, flowId, "job0",
"group0", String.valueOf(ExecutionStatus.COMPLETE));
- Iterator<JobStatus> jobStatusIterator2 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_1 =
+ getMockFlowStatus("flow" + flowId, "group" + flowId, flowId,
String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIteratorFlow1_0 =
getMockJobStatus("flow" + flowId+1, "group" + flowId+1, flowId+1,
"job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
-
+ Iterator<JobStatus> jobStatusIteratorFlow1_1 =
+ getMockFlowStatus("flow" + flowId+1, "group" + flowId+1, flowId+1,
String.valueOf(ExecutionStatus.COMPLETE));
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.eq("flow" + flowId),
Mockito.eq("group" + flowId), Mockito.anyLong(),
Mockito.anyString(), Mockito.anyString()))
- .thenReturn(jobStatusIterator1);
+ .thenReturn(jobStatusIteratorFlow0_0)
+ .thenReturn(jobStatusIteratorFlow0_1);
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.eq("flow" + (flowId+1)),
Mockito.eq("group" + (flowId+1)), Mockito.anyLong(),
Mockito.anyString(), Mockito.anyString()))
- .thenReturn(jobStatusIterator2);
+ .thenReturn(jobStatusIteratorFlow1_0)
+ .thenReturn(jobStatusIteratorFlow1_1);
String flowStateGaugeName0 =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group"+flowId,
"flow"+flowId, ServiceMetricNames.RUNNING_STATUS);
@@ -1063,6 +1151,9 @@ public class DagManagerTest {
this._dagManagerThread.run();
// should be successful since it should be cleaned up with status complete
Assert.assertEquals(metricContext.getParent().get().getGauges().get(flowStateGaugeName1).getValue(),
DagManager.FlowState.SUCCESSFUL.value);
+ Assert.assertEquals(this.dags.size(), 0);
+ Assert.assertEquals(this.jobToDag.size(), 0);
+ Assert.assertEquals(this.dagToJobs.size(), 0);
}
@Test (dependsOnMethods = "testEmitFlowMetricOnlyIfNotAdhoc")
@@ -1087,33 +1178,48 @@ public class DagManagerTest {
this.queue.offer(dagList.get(1));
this.queue.offer(dagList.get(2));;
// Set orchestration time to be 20 minutes in the past, the job should be
marked as SLA killed
- Iterator<JobStatus> jobStatusIterator1 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_0 =
getMockJobStatus("flow0", "group0", flowExecutionId, "job0", "group0",
String.valueOf(ExecutionStatus.RUNNING),
false, flowExecutionId);
- Iterator<JobStatus> jobStatusIterator2 =
- getMockJobStatus("flow1", "flow1", flowExecutionId, "job0", "group1",
String.valueOf(ExecutionStatus.RUNNING),
+ Iterator<JobStatus> jobStatusIteratorFlow1_0 =
+ getMockJobStatus("flow1", "group1", flowExecutionId, "job0", "group1",
String.valueOf(ExecutionStatus.RUNNING),
false, flowExecutionId);
- Iterator<JobStatus> jobStatusIterator3 =
- getMockJobStatus("flow2", "flow2", flowExecutionId, "job0", "group2",
String.valueOf(ExecutionStatus.RUNNING),
+ Iterator<JobStatus> jobStatusIteratorFlow2_0 =
+ getMockJobStatus("flow2", "group2", flowExecutionId, "job0", "group2",
String.valueOf(ExecutionStatus.RUNNING),
false, flowExecutionId);
- Iterator<JobStatus> jobStatusIterator4 =
- getMockJobStatus("flow0", "flow0", flowExecutionId, "job0", "group0",
String.valueOf(ExecutionStatus.CANCELLED),
+ Iterator<JobStatus> jobStatusIteratorFlow0_1 =
+ getMockJobStatus("flow0", "group0", flowExecutionId, "job0", "group0",
String.valueOf(ExecutionStatus.CANCELLED),
false, flowExecutionId);
- Iterator<JobStatus> jobStatusIterator5 =
- getMockJobStatus("flow1", "flow1", flowExecutionId, "job0", "group1",
String.valueOf(ExecutionStatus.CANCELLED),
+ Iterator<JobStatus> jobStatusIteratorFlow1_1 =
+ getMockJobStatus("flow1", "group1", flowExecutionId, "job0", "group1",
String.valueOf(ExecutionStatus.CANCELLED),
false, flowExecutionId);
- Iterator<JobStatus> jobStatusIterator6 =
- getMockJobStatus("flow2", "flow2", flowExecutionId, "job0", "group2",
String.valueOf(ExecutionStatus.CANCELLED),
+ Iterator<JobStatus> jobStatusIteratorFlow2_1 =
+ getMockJobStatus("flow2", "group2", flowExecutionId, "job0", "group2",
String.valueOf(ExecutionStatus.CANCELLED),
false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIteratorFlow0_2 =
+ getMockFlowStatus("flow0", "group0", flowExecutionId,
String.valueOf(ExecutionStatus.CANCELLED));
+ Iterator<JobStatus> jobStatusIteratorFlow1_2 =
+ getMockFlowStatus("flow1", "group1", flowExecutionId,
String.valueOf(ExecutionStatus.CANCELLED));
+ Iterator<JobStatus> jobStatusIteratorFlow2_2 =
+ getMockFlowStatus("flow2", "group2", flowExecutionId,
String.valueOf(ExecutionStatus.CANCELLED));
Mockito.when(_jobStatusRetriever
- .getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
- thenReturn(jobStatusIterator1).
- thenReturn(jobStatusIterator2).
- thenReturn(jobStatusIterator3).
- thenReturn(jobStatusIterator4).
- thenReturn(jobStatusIterator5).
- thenReturn(jobStatusIterator6);
+ .getJobStatusesForFlowExecution(Mockito.eq("flow0"),
Mockito.eq("group0"), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
+ thenReturn(jobStatusIteratorFlow0_0).
+ thenReturn(jobStatusIteratorFlow0_1).
+ thenReturn(jobStatusIteratorFlow0_2);
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.eq("flow1"),
Mockito.eq("group1"), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
+ thenReturn(jobStatusIteratorFlow1_0).
+ thenReturn(jobStatusIteratorFlow1_1).
+ thenReturn(jobStatusIteratorFlow1_2);
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.eq("flow2"), Mockito.eq("group2"),
Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
+ thenReturn(jobStatusIteratorFlow2_0).
+ thenReturn(jobStatusIteratorFlow2_1).
+ thenReturn(jobStatusIteratorFlow2_2);
// Run the thread once. All 3 jobs should be emitted an SLA exceeded event
this._dagManagerThread.run();
@@ -1126,7 +1232,6 @@ public class DagManagerTest {
String slakilledGroupName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0",
ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName1).getCount(),
2);
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName2).getCount(),
1);
-
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledGroupName).getCount(),
1);
// Cleanup
this._dagManagerThread.run();
Assert.assertEquals(metricContext.getParent().get().getMeters().get(allSlaKilledMeterName).getCount(),
previousSlaKilledCount + 3);
@@ -1160,39 +1265,48 @@ public class DagManagerTest {
this.queue.offer(dagList.get(1));
this.queue.offer(dagList.get(2));;
// The start time should be 16 minutes ago, which is past the start SLA so
the job should be cancelled
- Iterator<JobStatus> jobStatusIterator1 =
+ Iterator<JobStatus> jobStatusIteratorFlow0_0 =
getMockJobStatus( "flow0", "group0", flowExecutionId, "job0",
"group0", String.valueOf(ExecutionStatus.ORCHESTRATED),
false, flowExecutionId);
- Iterator<JobStatus> jobStatusIterator2 =
- getMockJobStatus("flow1", "flow1", flowExecutionId+1, "job0",
"group1", String.valueOf(ExecutionStatus.ORCHESTRATED),
+ Iterator<JobStatus> jobStatusIteratorFlow1_0 =
+ getMockJobStatus("flow1", "group1", flowExecutionId+1, "job0",
"group1", String.valueOf(ExecutionStatus.ORCHESTRATED),
false, flowExecutionId);
- Iterator<JobStatus> jobStatusIterator3 =
- getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0",
"group2", String.valueOf(ExecutionStatus.ORCHESTRATED),
+ Iterator<JobStatus> jobStatusIteratorFlow2_0 =
+ getMockJobStatus("flow2", "group2", flowExecutionId+1, "job0",
"group2", String.valueOf(ExecutionStatus.ORCHESTRATED),
false, flowExecutionId);
- Iterator<JobStatus> jobStatusIterator4 =
- getMockJobStatus( "flow0", "flow0", flowExecutionId+1, "job0",
"group0", String.valueOf(ExecutionStatus.COMPLETE),
+ Iterator<JobStatus> jobStatusIteratorFlow0_1 =
+ getMockJobStatus( "flow0", "group0", flowExecutionId+1, "job0",
"group0", String.valueOf(ExecutionStatus.COMPLETE),
false, flowExecutionId);
- Iterator<JobStatus> jobStatusIterator5 =
- getMockJobStatus("flow1", "flow1", flowExecutionId+1, "job0",
"group1", String.valueOf(ExecutionStatus.FAILED),
+ Iterator<JobStatus> jobStatusIteratorFlow1_1 =
+ getMockJobStatus("flow1", "group1", flowExecutionId+1, "job0",
"group1", String.valueOf(ExecutionStatus.FAILED),
false, flowExecutionId);
- Iterator<JobStatus> jobStatusIterator6 =
- getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0",
"group2", String.valueOf(ExecutionStatus.COMPLETE),
+ Iterator<JobStatus> jobStatusIteratorFlow2_1 =
+ getMockJobStatus("flow2", "group2", flowExecutionId+1, "job0",
"group2", String.valueOf(ExecutionStatus.COMPLETE),
false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIteratorFlow0_2 =
+ getMockFlowStatus( "flow0", "group0", flowExecutionId+1,
String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIteratorFlow1_2 =
+ getMockFlowStatus("flow1", "group1", flowExecutionId+1,
String.valueOf(ExecutionStatus.FAILED));
+ Iterator<JobStatus> jobStatusIteratorFlow2_2 =
+ getMockFlowStatus("flow2", "group2", flowExecutionId+1,
String.valueOf(ExecutionStatus.COMPLETE));
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.eq("flow0"),
Mockito.eq("group0"), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
- thenReturn(jobStatusIterator1).
- thenReturn(jobStatusIterator4);
+ thenReturn(jobStatusIteratorFlow0_0).
+ thenReturn(jobStatusIteratorFlow0_1).
+ thenReturn(jobStatusIteratorFlow0_2);
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.eq("flow1"),
Mockito.eq("group1"), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
- thenReturn(jobStatusIterator2).
- thenReturn(jobStatusIterator5);
+ thenReturn(jobStatusIteratorFlow1_0).
+ thenReturn(jobStatusIteratorFlow1_1).
+ thenReturn(jobStatusIteratorFlow1_2);
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.eq("flow2"),
Mockito.eq("group2"), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
- thenReturn(jobStatusIterator3).
- thenReturn(jobStatusIterator6);
+ thenReturn(jobStatusIteratorFlow2_0).
+ thenReturn(jobStatusIteratorFlow2_1).
+ thenReturn(jobStatusIteratorFlow2_2);
this._dagManagerThread.run();
@@ -1213,9 +1327,8 @@ public class DagManagerTest {
Assert.assertEquals(this.dags.size(), 0);
Assert.assertEquals(this.jobToDag.size(), 0);
Assert.assertEquals(this.dagToJobs.size(), 0);
- }
-
+ }
@AfterClass
public void cleanUp() throws Exception {