This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 227f39b07 [GOBBLIN-2017] handle multiple next jobs to run in dag procs
(#3965)
227f39b07 is described below
commit 227f39b0740d85b07e95f2fa87e7bcf0c834c5da
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Jun 11 18:08:33 2024 -0700
[GOBBLIN-2017] handle multiple next jobs to run in dag procs (#3965)
* handle multiple next jobs to run
* fix test
---
.../modules/orchestration/proc/DagProcUtils.java | 26 +++++
.../modules/orchestration/proc/LaunchDagProc.java | 28 +-----
.../orchestration/proc/ReevaluateDagProc.java | 71 +++++---------
.../modules/orchestration/proc/ResumeDagProc.java | 21 +----
.../service/modules/spec/JobExecutionPlan.java | 4 +
.../orchestration/proc/LaunchDagProcTest.java | 76 +++++++++++----
.../orchestration/proc/ReevaluateDagProcTest.java | 105 ++++++++++++++++++---
.../scheduler/GobblinServiceJobSchedulerTest.java | 47 +++++----
8 files changed, 241 insertions(+), 137 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 91bdbf68d..c90907d23 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -55,6 +56,29 @@ import static
org.apache.gobblin.service.ExecutionStatus.CANCELLED;
*/
@Slf4j
public class DagProcUtils {
+
+ /**
+ * If there is a single job to run next, it runs it. If there are multiple
jobs to run, it creates a
+ * {@link
org.apache.gobblin.service.modules.orchestration.DagActionStore.DagActionType#REEVALUATE}
dag action for
+ * each of them and those jobs will be launched in respective {@link
ReevaluateDagProc}.
+ */
+ public static void submitNextNodes(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag,
+ DagManager.DagId dagId) throws IOException {
+ Set<Dag.DagNode<JobExecutionPlan>> nextNodes =
DagManagerUtils.getNext(dag);
+
+ if (nextNodes.size() == 1) {
+ Dag.DagNode<JobExecutionPlan> dagNode = nextNodes.iterator().next();
+ DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode,
dagId);
+ log.info("Submitted job {} for dagId {}",
DagManagerUtils.getJobName(dagNode), dagId);
+ } else {
+ for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+ JobExecutionPlan jobExecutionPlan = dagNode.getValue();
+
dagManagementStateStore.addJobDagAction(jobExecutionPlan.getFlowGroup(),
jobExecutionPlan.getFlowName(),
+ String.valueOf(jobExecutionPlan.getFlowExecutionId()),
jobExecutionPlan.getJobName(), DagActionStore.DagActionType.REEVALUATE);
+ }
+ }
+ }
+
/**
* - submits a {@link JobSpec} to a {@link SpecExecutor}
* - emits a {@link TimingEvent.LauncherTimings#JOB_ORCHESTRATED} {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}
@@ -122,6 +146,8 @@ public class DagProcUtils {
}
throw new RuntimeException(e);
}
+
+ log.info("Submitted job {} for dagId {}",
DagManagerUtils.getJobName(dagNode), dagId);
}
public static void cancelDagNode(Dag.DagNode<JobExecutionPlan>
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws
IOException {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index d3a7614cc..c2d840ef7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
@@ -57,6 +56,10 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}
+ /**
+ * It retrieves the {@link FlowSpec} for the dag this dag proc corresponds
to, from the {@link DagManagementStateStore}
+ * and compiles it to create a {@link Dag} and saves it in the {@link
DagManagementStateStore}.
+ */
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
throws IOException {
@@ -80,32 +83,11 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
log.warn("Dag with id " + getDagId() + " could not be compiled.");
// todo - add metrics
} else {
- submitNextNodes(dagManagementStateStore, dag.get());
+ DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(),
getDagId());
// Checkpoint the dag state, it should have an updated value of dag nodes
dagManagementStateStore.checkpointDag(dag.get());
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore,
getDagTask().getDagAction());
orchestrationDelayCounter.set(System.currentTimeMillis() -
DagManagerUtils.getFlowExecId(dag.get()));
}
}
-
- /**
- * Submit next set of Dag nodes in the provided Dag.
- */
- private void submitNextNodes(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag) {
- Set<Dag.DagNode<JobExecutionPlan>> nextNodes =
DagManagerUtils.getNext(dag);
-
- if (nextNodes.size() > 1) {
- handleMultipleJobs(nextNodes);
- }
-
- //Submit jobs from the dag ready for execution.
- for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
- DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode,
getDagId());
- log.info("Submitted job {} for dagId {}",
DagManagerUtils.getJobName(dagNode), getDagId());
- }
- }
-
- private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>>
nextNodes) {
- throw new UnsupportedOperationException("More than one start job is not
allowed");
- }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 6fe42fcc4..4359a6d88 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -19,9 +19,7 @@ package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
import java.util.Optional;
-import java.util.Set;
-import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.quartz.SchedulerException;
@@ -57,25 +55,7 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
@Override
protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
initialize(DagManagementStateStore dagManagementStateStore)
throws IOException {
- Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
dagNodeWithJobStatus =
- dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
-
- if (!dagNodeWithJobStatus.getLeft().isPresent() ||
!dagNodeWithJobStatus.getRight().isPresent()) {
- // this is possible when MALA malfunctions and a duplicated reevaluate
dag proc is launched for a dag node that is
- // already "reevaluated" and cleaned up.
- return ImmutablePair.of(Optional.empty(), Optional.empty());
- }
-
- ExecutionStatus executionStatus =
ExecutionStatus.valueOf(dagNodeWithJobStatus.getRight().get().getEventName());
- if
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
- log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should
have been created only for finished status - {}",
- dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES);
- // this may happen if adding job status in the store failed after adding
a ReevaluateDagAction in KafkaJobStatusMonitor
- throw new RuntimeException(String.format("Job status %s is not final for
job %s", executionStatus, getDagId()));
- }
-
- setStatus(dagManagementStateStore, dagNodeWithJobStatus.getLeft().get(),
executionStatus);
- return dagNodeWithJobStatus;
+ return dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
}
@Override
@@ -90,9 +70,30 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
}
Dag.DagNode<JobExecutionPlan> dagNode =
dagNodeWithJobStatus.getLeft().get();
- JobStatus jobStatus = dagNodeWithJobStatus.getRight().get();
- ExecutionStatus executionStatus = dagNode.getValue().getExecutionStatus();
+
+ if (!dagNodeWithJobStatus.getRight().isPresent()) {
+ // Usually reevaluate dag action is created by JobStatusMonitor when a
finished job status is available,
+ // but when reevaluate/resume/launch dag proc found multiple parallel
jobs to run next, it creates reevaluate
+ // dag actions for each of those parallel job and in this scenario there
is no job status available.
+ // If the job status is not present, this job was never launched, submit
it now.
+ DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode,
getDagId());
+ return;
+ }
+
Dag<JobExecutionPlan> dag =
dagManagementStateStore.getDag(getDagId()).get();
+ JobStatus jobStatus = dagNodeWithJobStatus.getRight().get();
+ ExecutionStatus executionStatus =
ExecutionStatus.valueOf(jobStatus.getEventName());
+ setStatus(dagManagementStateStore, dagNode, executionStatus);
+
+ if
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
+ log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should
have been created only for finished status - {}",
+ dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES);
+ // this may happen if adding job status in the store failed after adding
a ReevaluateDagAction in KafkaJobStatusMonitor
+ throw new RuntimeException(String.format("Job status for dagNode %s is
%s. Re-evaluate dag action are created for"
+ + " new jobs with no job status when there are multiple of them
to run next; or when a job finishes with status - %s",
+ dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES));
+ }
+
onJobFinish(dagManagementStateStore, dagNode, executionStatus, dag);
if (jobStatus.isShouldRetry()) {
@@ -134,6 +135,7 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
for (Dag.DagNode<JobExecutionPlan> node : dag.getNodes()) {
if (node.getValue().getId().equals(dagNodeId)) {
node.getValue().setExecutionStatus(executionStatus);
+ dagManagementStateStore.addDagNodeState(node, getDagId());
dagManagementStateStore.checkpointDag(dag);
return;
}
@@ -165,7 +167,7 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
break;
case COMPLETE:
dagManagementStateStore.getDagManagerMetrics().incrementExecutorSuccess(dagNode);
- submitNextNodes(dagManagementStateStore, dag);
+ DagProcUtils.submitNextNodes(dagManagementStateStore, dag, getDagId());
break;
default:
log.warn("It should not reach here. Job status {} is unexpected.",
executionStatus);
@@ -176,27 +178,6 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
dagManagementStateStore.deleteDagNodeState(getDagId(), dagNode);
}
- /**
- * Submit next set of Dag nodes in the Dag identified by the provided dagId
- */
- private void submitNextNodes(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag) {
- Set<Dag.DagNode<JobExecutionPlan>> nextNodes =
DagManagerUtils.getNext(dag);
-
- if (nextNodes.size() > 1) {
- handleMultipleJobs(nextNodes);
- }
-
- if (!nextNodes.isEmpty()) {
- Dag.DagNode<JobExecutionPlan> nextNode =
nextNodes.stream().findFirst().get();
- DagProcUtils.submitJobToExecutor(dagManagementStateStore, nextNode,
getDagId());
- log.info("Submitted job {} for dagId {}",
DagManagerUtils.getJobName(nextNode), getDagId());
- }
- }
-
- private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>>
nextNodes) {
- throw new UnsupportedOperationException("More than one start job is not
allowed");
- }
-
private void
removeFlowFinishDeadlineTriggerAndDagAction(DagManagementStateStore
dagManagementStateStore) {
DagActionStore.DagAction enforceFlowFinishDeadlineDagAction =
DagActionStore.DagAction.forFlow(getDagNodeId().getFlowGroup(),
getDagNodeId().getFlowName(),
String.valueOf(getDagNodeId().getFlowExecutionId()),
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
index b3d077c46..ddc100ae8 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import com.google.common.collect.Maps;
@@ -92,26 +91,8 @@ public class ResumeDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
// if it fails here, it will check point the failed dag in the (running)
dag store again, which is idempotent
dagManagementStateStore.deleteFailedDag(failedDag.get());
- resumeDag(dagManagementStateStore, failedDag.get());
+ DagProcUtils.submitNextNodes(dagManagementStateStore, failedDag.get(),
getDagId());
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore,
getDagTask().getDagAction());
}
-
- private void resumeDag(DagManagementStateStore dagManagementStateStore,
Dag<JobExecutionPlan> dag) {
- Set<Dag.DagNode<JobExecutionPlan>> nextNodes =
DagManagerUtils.getNext(dag);
-
- if (nextNodes.size() > 1) {
- handleMultipleJobs(nextNodes);
- }
-
- //Submit jobs from the dag ready for execution.
- for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
- DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode,
getDagId());
- log.info("Submitted job {} for dagId {}",
DagManagerUtils.getJobName(dagNode), getDagId());
- }
- }
-
- private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>>
nextNodes) {
- throw new UnsupportedOperationException("More than one start job is not
allowed");
- }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 7210c6725..72e545104 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -68,6 +68,10 @@ public class JobExecutionPlan {
private final JobSpec jobSpec;
private final SpecExecutor specExecutor;
+ // the field is only read in one place in ResumeDagProc, in other places, a
dag node's status is queried
+ // through API DagManagementStateStore#getDagNodeWithJobStatus which
retrieves status directly from JobStatusRetriever
+ // todo - we should either keep this field updated as soon as a status is
received so it can be used everywhere
+ // or consider removing it completely
private ExecutionStatus executionStatus = ExecutionStatus.PENDING;
private final int maxAttempts;
private int currentGeneration = 1;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 6e723a716..b4c5e0109 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -22,6 +22,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutionException;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -39,13 +40,16 @@ import
org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
@@ -82,41 +86,75 @@ public class LaunchDagProcTest {
}
@Test
- public void launchDag()
- throws IOException, InterruptedException, URISyntaxException {
- Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1",
System.currentTimeMillis(), DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
- 5, "user5",
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("fg")));
+ public void launchDag() throws IOException, InterruptedException,
URISyntaxException, ExecutionException {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ String flowExecutionId = "12345";
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1",
Long.parseLong(flowExecutionId),
+ DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), 5, "user5",
ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName)));
FlowCompilationValidationHelper flowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
+ SpecProducer<Spec> specProducer =
DagManagerUtils.getSpecProducer(dag.getNodes().get(0));
+ List<SpecProducer<Spec>> specProducers =
ReevaluateDagProcTest.getDagSpecProducers(dag);
LaunchDagProc launchDagProc = new LaunchDagProc(
- new LaunchDagTask(new DagActionStore.DagAction("fg", "fn", "12345",
- "jn", DagActionStore.DagActionType.LAUNCH), null,
this.dagManagementStateStore),
+ new LaunchDagTask(new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, "job0",
+ DagActionStore.DagActionType.LAUNCH), null,
this.dagManagementStateStore),
flowCompilationValidationHelper);
launchDagProc.process(this.dagManagementStateStore);
- int expectedNumOfSavingDagNodeStates = 1; // = number of start nodes
- Assert.assertEquals(expectedNumOfSavingDagNodeStates,
-
Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
- .filter(a ->
a.getMethod().getName().equals("addDagNodeState")).count());
- Mockito.verify(this.dagManagementStateStore, Mockito.times(1))
+ int numOfLaunchedJobs = 1; // = number of start nodes
+ long addSpecCount = specProducers.stream()
+ .mapToLong(p -> Mockito.mockingDetails(p)
+ .getInvocations()
+ .stream()
+ .filter(a -> a.getMethod().getName().equals("addSpec"))
+ .count())
+ .sum();
+ Mockito.verify(specProducer,
Mockito.times(numOfLaunchedJobs)).addSpec(any());
+ Assert.assertEquals(numOfLaunchedJobs, addSpecCount);
+
+ Mockito.verify(this.dagManagementStateStore,
Mockito.times(numOfLaunchedJobs))
.addFlowDagAction(any(), any(), any(),
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
}
+ @Test
+ public void launchDagWithMultipleParallelJobs() throws IOException,
InterruptedException, URISyntaxException {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ String flowExecutionId = "12345";
+ Dag<JobExecutionPlan> dag =
buildDagWithMultipleNodesAtDifferentLevels("1", flowExecutionId,
+ DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),"user5",
ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName)));
+ FlowCompilationValidationHelper flowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
+
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
+ LaunchDagProc launchDagProc = new LaunchDagProc(
+ new LaunchDagTask(new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId,
+ "jn", DagActionStore.DagActionType.LAUNCH), null,
this.dagManagementStateStore),
+ flowCompilationValidationHelper);
+
+ launchDagProc.process(this.dagManagementStateStore);
+ int numOfLaunchedJobs = 3; // = number of start nodes
+ // parallel jobs are launched through reevaluate dag action
+ Mockito.verify(this.dagManagementStateStore,
Mockito.times(numOfLaunchedJobs))
+ .addJobDagAction(eq(flowGroup), eq(flowName), eq(flowExecutionId),
any(), eq(DagActionStore.DagActionType.REEVALUATE));
+ }
+
// This creates a dag like this
// D1 D2 D3
// \ | /
// DN4
- // / | \
- // D5 D6 D7
+ // / \
+ // D5 D6
- // Not used now, but can be used in GOBBLIN-2017
- public static Dag<JobExecutionPlan>
buildDagWithMultipleNodesAtDifferentLevels(String id, Long flowExecutionId,
String flowFailureOption,
- String proxyUser, Config additionalConfig)
- throws URISyntaxException {
+ public static Dag<JobExecutionPlan>
buildDagWithMultipleNodesAtDifferentLevels(String id, String flowExecutionId,
+ String flowFailureOption, String proxyUser, Config additionalConfig)
throws URISyntaxException {
List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
- for (int i = 0; i < 7; i++) {
+ for (int i = 0; i < 6; i++) {
String suffix = Integer.toString(i);
Config jobConfig = ConfigBuilder.create().
addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
@@ -129,7 +167,7 @@ public class LaunchDagProcTest {
jobConfig = additionalConfig.withFallback(jobConfig);
if (i == 3) {
jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job0,job1,job2"));
- } else if ((i == 4) || (i == 5) || (i == 6)) {
+ } else if ((i == 4) || (i == 5)) {
jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job3"));
}
JobSpec js = JobSpec.builder("test_job" +
suffix).withVersion(suffix).withConfig(jobConfig).
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index eb3429597..c5ac11f69 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -57,6 +57,7 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.JobStatus;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -110,13 +111,7 @@ public class ReevaluateDagProcTest {
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
);
- List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
- try {
- return DagManagerUtils.getSpecProducer(n);
- } catch (ExecutionException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }).collect(Collectors.toList());
+ List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
message("Test
message").eventName(ExecutionStatus.COMPLETE.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
@@ -168,13 +163,7 @@ public class ReevaluateDagProcTest {
doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
doReturn(true).when(dagManagementStateStore).releaseQuota(any());
- List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
- try {
- return DagManagerUtils.getSpecProducer(n);
- } catch (ExecutionException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }).collect(Collectors.toList());
+ List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
long addSpecCount = specProducers.stream()
.mapToLong(p -> Mockito.mockingDetails(p)
@@ -206,4 +195,92 @@ public class ReevaluateDagProcTest {
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
}
+
+ @Test
+ public void testCurrentJobToRun() throws Exception {
+ String flowName = "fn3";
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 2, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ );
+ List<Dag.DagNode<JobExecutionPlan>> startDagNodes = dag.getStartNodes();
+ List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
+
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+ doReturn(new ImmutablePair<>(Optional.of(startDagNodes.get(0)),
Optional.empty()))
+ .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+
+ ReevaluateDagProc
+ reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
+ String.valueOf(flowExecutionId), "job0",
DagActionStore.DagActionType.REEVALUATE), null,
+ dagManagementStateStore));
+ reEvaluateDagProc.process(dagManagementStateStore);
+
+ int numOfLaunchedJobs = 1; // only the current job
+ // only the current job should have run
+ Mockito.verify(specProducers.get(0), Mockito.times(1)).addSpec(any());
+
+ specProducers.stream().skip(numOfLaunchedJobs) // separately verified
`specProducers.get(0)`
+ .forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any()));
+
+ // no job's state is deleted because that happens when the job finishes
triggered the reevaluate dag proc
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagNodeState(any(), any());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(any());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), any(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ Mockito.verify(dagActionReminderScheduler,
Mockito.never()).unscheduleReminderJob(any());
+ }
+
+ @Test
+ public void testMultipleNextJobToRun() throws Exception {
+ String flowName = "fn4";
+ Dag<JobExecutionPlan> dag =
LaunchDagProcTest.buildDagWithMultipleNodesAtDifferentLevels("1",
String.valueOf(flowExecutionId),
+ DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), "user5",
ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ );
+ JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup)
+ .jobName("job3").flowExecutionId(flowExecutionId).message("Test
message").eventName(ExecutionStatus.COMPLETE.name())
+
.startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+ doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus)))
+ .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+
+ // mocked job status for the first four jobs
+
dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+
dag.getNodes().get(2).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+
dag.getNodes().get(3).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+
+ ReevaluateDagProc
+ reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
+ String.valueOf(flowExecutionId), "job3",
DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore));
+ List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
+ // process 4th job
+ reEvaluateDagProc.process(dagManagementStateStore);
+
+ int numOfLaunchedJobs = 2; // = number of jobs that should launch when 4th
job passes, i.e. 5th and 6th job
+ // parallel jobs are launched through reevaluate dag action
+ Mockito.verify(dagManagementStateStore, Mockito.times(numOfLaunchedJobs))
+ .addJobDagAction(eq(flowGroup), eq(flowName),
eq(String.valueOf(flowExecutionId)), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
+
+ // when there are parallel jobs to launch, they are not directly sent to
spec producers, instead reevaluate dag action is created
+ specProducers.forEach(sp -> Mockito.verify(sp,
Mockito.never()).addSpec(any()));
+ }
+
+ public static List<SpecProducer<Spec>>
getDagSpecProducers(Dag<JobExecutionPlan> dag) {
+ return dag.getNodes().stream().map(n -> {
+ try {
+ return DagManagerUtils.getSpecProducer(n);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ }
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index dfc7d3a83..6c709d1bd 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -17,15 +17,18 @@
package org.apache.gobblin.service.modules.scheduler;
import java.io.File;
+import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
import org.mockito.invocation.Invocation;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
@@ -67,7 +70,10 @@ import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.util.ConfigUtils;
import static
org.apache.gobblin.runtime.spec_catalog.FlowCatalog.FLOWSPEC_STORE_DIR_KEY;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -112,7 +118,7 @@ public class GobblinServiceJobSchedulerTest {
Properties properties = new Properties();
properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
FlowCatalog flowCatalog = new
FlowCatalog(ConfigUtils.propertiesToConfig(properties));
- SpecCatalogListener mockListener = Mockito.mock(SpecCatalogListener.class);
+ SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
flowCatalog.addListener(mockListener);
@@ -129,14 +135,14 @@ public class GobblinServiceJobSchedulerTest {
Assert.assertEquals(flowCatalog.getSpecs().size(), 2);
- Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+ Orchestrator mockOrchestrator = mock(Orchestrator.class);
UserQuotaManager quotaManager = new InMemoryUserQuotaManager(quotaConfig);
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new
TestGobblinServiceJobScheduler("testscheduler",
ConfigFactory.empty(), Optional.of(flowCatalog), mockOrchestrator,
Optional.of(quotaManager), null, false);
- SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
+ SpecCompiler mockCompiler = mock(SpecCompiler.class);
Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
Mockito.doAnswer((Answer<Void>) a -> {
scheduler.isCompilerHealthy = true;
@@ -196,7 +202,7 @@ public class GobblinServiceJobSchedulerTest {
ServiceBasedAppLauncher serviceLauncher = new
ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
// Assume that the catalog can store corrupted flows
- SpecCatalogListener mockListener = Mockito.mock(SpecCatalogListener.class);
+ SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
flowCatalog.addListener(mockListener);
@@ -216,13 +222,13 @@ public class GobblinServiceJobSchedulerTest {
Assert.assertEquals(flowCatalog.getSpecs().size(), 3);
- Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+ Orchestrator mockOrchestrator = mock(Orchestrator.class);
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new
TestGobblinServiceJobScheduler("testscheduler",
ConfigFactory.empty(), Optional.of(flowCatalog), mockOrchestrator,
Optional.of(new InMemoryUserQuotaManager(quotaConfig)), null, false);
- SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
+ SpecCompiler mockCompiler = mock(SpecCompiler.class);
Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
Mockito.doAnswer((Answer<Void>) a -> {
scheduler.isCompilerHealthy = true;
@@ -260,7 +266,7 @@ public class GobblinServiceJobSchedulerTest {
ServiceBasedAppLauncher serviceLauncher = new
ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
// Assume that the catalog can store corrupted flows
- SpecCatalogListener mockListener = Mockito.mock(SpecCatalogListener.class);
+ SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
flowCatalog.addListener(mockListener);
@@ -279,7 +285,7 @@ public class GobblinServiceJobSchedulerTest {
Assert.assertEquals(flowCatalog.getSpecs().size(), 3);
- Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+ Orchestrator mockOrchestrator = mock(Orchestrator.class);
SchedulerService schedulerService = new SchedulerService(new Properties());
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new
TestGobblinServiceJobScheduler("testscheduler",
@@ -287,7 +293,7 @@ public class GobblinServiceJobSchedulerTest {
schedulerService.startAsync().awaitRunning();
scheduler.startUp();
- SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
+ SpecCompiler mockCompiler = mock(SpecCompiler.class);
Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
Mockito.doAnswer((Answer<Void>) a -> {
scheduler.isCompilerHealthy = true;
@@ -342,8 +348,8 @@ public class GobblinServiceJobSchedulerTest {
FlowSpec flowSpec1 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"),
"flowName1", "group1",
ConfigFactory.empty(), true);
- Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
- SpecCompiler mockSpecCompiler = Mockito.mock(SpecCompiler.class);
+ Orchestrator mockOrchestrator = mock(Orchestrator.class);
+ SpecCompiler mockSpecCompiler = mock(SpecCompiler.class);
when(mockOrchestrator.getSpecCompiler()).thenReturn(mockSpecCompiler);
Dag<JobExecutionPlan> mockDag0 = this.buildDag(flowSpec0.getConfig(), "0");
Dag<JobExecutionPlan> mockDag1 = this.buildDag(flowSpec1.getConfig(), "1");
@@ -354,7 +360,7 @@ public class GobblinServiceJobSchedulerTest {
// Mock a GaaS scheduler not in warm standby mode
GobblinServiceJobScheduler scheduler = new
GobblinServiceJobScheduler("testscheduler",
ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
mockOrchestrator, schedulerService,
- Optional.of(new InMemoryUserQuotaManager(quotaConfig)),
Optional.absent(), false, Optional.of(Mockito.mock(
+ Optional.of(new InMemoryUserQuotaManager(quotaConfig)),
Optional.absent(), false, Optional.of(mock(
FlowLaunchHandler.class)));
schedulerService.startAsync().awaitRunning();
@@ -379,12 +385,21 @@ public class GobblinServiceJobSchedulerTest {
schedulerWithWarmStandbyEnabled.startUp();
schedulerWithWarmStandbyEnabled.setActive(true);
+ MockitoAnnotations.openMocks(this);
+ Map<String, FlowSpec> mockMap = spy(new HashMap<>());
+
+ // Use reflection to set the private map field to the spied one, so we can
check the invocations
+ Field mapField =
schedulerWithWarmStandbyEnabled.getClass().getDeclaredField("scheduledFlowSpecs");
+ mapField.setAccessible(true);
+ mapField.set(schedulerWithWarmStandbyEnabled, mockMap);
+
schedulerWithWarmStandbyEnabled.onAddSpec(flowSpec0); //Ignore the
response for this request
-
Assert.assertEquals(schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size(),
1);
+ Mockito.verify(mockMap,
Mockito.times(1)).put(eq(flowSpec0.getUri().toString()), eq(flowSpec0));
schedulerWithWarmStandbyEnabled.onAddSpec(flowSpec1);
// Second flow should be added to scheduled flows since no quota check in
this case
-
AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(5000L).backoffFactor(1)
- .assertTrue(input ->
schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size() == 2, "Waiting for
add spec to complete");
+ // we need to check if map's put is called to check if the spec was ever
scheduled. we cannot just check the size
+ // because the spec is removed by NonScheduledJobRunner
+ Mockito.verify(mockMap,
Mockito.times(1)).put(eq(flowSpec1.getUri().toString()), eq(flowSpec1));
// set scheduler to be inactive and unschedule flows
schedulerWithWarmStandbyEnabled.setActive(false);
Assert.assertEquals(schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size(),
0);
@@ -398,7 +413,7 @@ public class GobblinServiceJobSchedulerTest {
Optional<FlowCatalog> flowCatalog, Orchestrator orchestrator,
Optional<UserQuotaManager> quotaManager,
SchedulerService schedulerService, boolean isWarmStandbyEnabled)
throws Exception {
super(serviceName, config, Optional.absent(), flowCatalog, orchestrator,
schedulerService,
- quotaManager, Optional.absent(), isWarmStandbyEnabled,
Optional.of(Mockito.mock(
+ quotaManager, Optional.absent(), isWarmStandbyEnabled,
Optional.of(mock(
FlowLaunchHandler.class)));
if (schedulerService != null) {
hasScheduler = true;