This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bee3933 [GOBBLIN-808] implement azkaban flow cancel when dag manager
is enabled
bee3933 is described below
commit bee393352880aad5d9edb05fcc2e5f5f4e5f4811
Author: Arjun <[email protected]>
AuthorDate: Mon Jul 22 12:46:17 2019 -0700
[GOBBLIN-808] implement azkaban flow cancel when dag manager is enabled
Closes #2674 from arjun4084346/shared-gaas-cancel
---
.../apache/gobblin/runtime/api/SpecProducer.java | 9 ++
gobblin-compaction/build.gradle | 2 +
gobblin-runtime/build.gradle | 1 +
.../InMemorySpecProducer.java | 22 ++-
.../spec_executorInstance/MockedSpecExecutor.java | 58 ++++++++
gobblin-service/build.gradle | 1 +
.../service/modules/orchestration/DagManager.java | 161 ++++++++++++++++----
.../modules/orchestration/DagManagerUtils.java | 20 ++-
.../modules/orchestration/Orchestrator.java | 33 ++---
.../scheduler/GobblinServiceJobScheduler.java | 2 +-
.../service/modules/spec/JobExecutionPlan.java | 5 +-
.../spec/JobExecutionPlanListDeserializer.java | 14 ++
.../spec/JobExecutionPlanListSerializer.java | 9 ++
.../modules/spec/SerializationConstants.java | 1 +
.../modules/orchestration/DagManagerFlowTest.java | 164 +++++++++++++++++++++
.../modules/orchestration/DagManagerTest.java | 23 ++-
.../modules/orchestration/DagTestUtils.java | 16 +-
.../modules/orchestration/FSDagStateStoreTest.java | 5 +-
.../orchestration/MysqlDagStateStoreTest.java | 13 +-
.../monitoring/KafkaAvroJobStatusMonitorTest.java | 1 -
20 files changed, 482 insertions(+), 78 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
index 880847d..790e1f4 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
@@ -23,6 +23,7 @@ import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.util.CompletedFuture;
/**
@@ -48,4 +49,12 @@ public interface SpecProducer<V> {
/** List all {@link Spec} being executed on {@link SpecExecutor}. */
Future<? extends List<V>> listSpecs();
+
+ default String serializeAddSpecResponse(Future<?> response) {
+ return "";
+ }
+
+ default Future<?> deserializeAddSpecResponse(String serializedResponse) {
+ return new CompletedFuture(serializedResponse, null);
+ }
}
\ No newline at end of file
diff --git a/gobblin-compaction/build.gradle b/gobblin-compaction/build.gradle
index 88bc161..7f8a450 100644
--- a/gobblin-compaction/build.gradle
+++ b/gobblin-compaction/build.gradle
@@ -29,6 +29,8 @@ dependencies {
// Given orc-mapreduce depends on hive version of hive-storage-api(2.4.0)
and conflicted
// with hive-exec-core in older version(1.0.1), we need to shadow
ord-mapreduce's transitive deps.
+ // and include direct orc-mapreduce library just as a compileOnly dependency
+ compileOnly externalDependency.orcMapreduce
compile project(path: ":gobblin-modules:gobblin-orc-dep",
configuration:"shadow")
compile externalDependency.calciteCore
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index 71dd510..b28db6b 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -74,6 +74,7 @@ dependencies {
compile externalDependency.lombok
compile externalDependency.metricsCore
compile externalDependency.metricsJvm
+ compile externalDependency.mockito
compile externalDependency.pegasus.data
compile externalDependency.quartz
compile externalDependency.slf4j
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
index cc74757..5e2b0bd 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
@@ -22,18 +22,19 @@ import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.util.CompletedFuture;
-import lombok.extern.slf4j.Slf4j;
-
@Slf4j
public class InMemorySpecProducer implements SpecProducer<Spec>, Serializable {
private final Map<URI, Spec> provisionedSpecs;
@@ -80,4 +81,21 @@ public class InMemorySpecProducer implements
SpecProducer<Spec>, Serializable {
public Future<? extends List<Spec>> listSpecs() {
return new
CompletedFuture<>(Lists.newArrayList(provisionedSpecs.values()), null);
}
+
+ @Override
+ public String serializeAddSpecResponse(Future future) {
+ CompletedFuture<Boolean> completedFuture = (CompletedFuture) future;
+
+ try {
+ return completedFuture.get().toString();
+ } catch (ExecutionException e) {
+ log.error("Error during future serialization in {}.", getClass(), e);
+ return "";
+ }
+ }
+
+ @Override
+ public Future<?> deserializeAddSpecResponse(String serializedResponse) {
+ return new CompletedFuture(Boolean.valueOf(serializedResponse), null);
+ }
}
\ No newline at end of file
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
new file mode 100644
index 0000000..10f7786
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_executorInstance;
+
+import java.net.URI;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.mockito.Mockito;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.util.CompletedFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+
+public class MockedSpecExecutor extends InMemorySpecExecutor {
+ private SpecProducer<Spec> mockedSpecProducer;
+
+ public MockedSpecExecutor(Config config) {
+ super(config);
+ this.mockedSpecProducer = Mockito.mock(SpecProducer.class);
+ when(mockedSpecProducer.addSpec(any())).thenReturn(new
CompletedFuture(Boolean.TRUE, null));
+ }
+
+ public static SpecExecutor createDummySpecExecutor(URI uri) {
+ Properties properties = new Properties();
+ properties.setProperty(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
uri.toString());
+ return new MockedSpecExecutor(ConfigFactory.parseProperties(properties));
+ }
+
+ @Override
+ public Future<? extends SpecProducer> getProducer(){
+ return new CompletedFuture(this.mockedSpecProducer, null);
+ }
+}
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index 2701022..bafa6d1 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -30,6 +30,7 @@ dependencies {
compile
project(":gobblin-restli:gobblin-flow-config-service:gobblin-flow-config-service-server")
compile
project(":gobblin-restli:gobblin-flow-config-service:gobblin-flow-config-service-client")
compile project(":gobblin-restli:gobblin-restli-utils")
+ compile project(":gobblin-modules:gobblin-azkaban")
compile externalDependency.avro
compile externalDependency.avroMapredH2
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 2fbb796..e8cf1a4 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -26,9 +26,13 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -42,6 +46,7 @@ import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
@@ -49,6 +54,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
@@ -57,7 +63,7 @@ import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -79,7 +85,8 @@ import static
org.apache.gobblin.service.ExecutionStatus.valueOf;
/**
* This class implements a manager to manage the life cycle of a {@link Dag}.
A {@link Dag} is submitted to the
* {@link DagManager} by the {@link Orchestrator#orchestrate(Spec)} method. On
receiving a {@link Dag}, the
- * {@link DagManager} first persists the {@link Dag} to the {@link
DagStateStore}, and then submits it to a {@link BlockingQueue}.
+ * {@link DagManager} first persists the {@link Dag} to the {@link
DagStateStore}, and then submits it to the specific
+ * {@link DagManagerThread}'s {@link BlockingQueue} based on the
flowExecutionId of the Flow.
* This guarantees that each {@link Dag} received by the {@link DagManager}
can be recovered in case of a leadership
* change or service restart.
*
@@ -89,6 +96,11 @@ import static
org.apache.gobblin.service.ExecutionStatus.valueOf;
* jobs. Upon completion of a job, it will either schedule the next job in the
Dag (on SUCCESS) or mark the Dag as failed
* (on FAILURE). Upon completion of a Dag execution, it will perform the
required clean up actions.
*
+ * For deleteSpec/cancellation requests for a flow URI, {@link DagManager}
finds out the flowExecutionId using
+ * {@link JobStatusRetriever}, and forwards the request to the {@link
DagManagerThread} which handled the addSpec request
+ * for this flow. We need separate {@value queue} and {@value cancelQueue} for
each {@link DagManagerThread} because
+ * cancellation needs the information which is stored only in the same {@link
DagManagerThread}.
+ *
* The {@link DagManager} is active only in the leader mode. To ensure, each
{@link Dag} managed by a {@link DagManager} is
* checkpointed to a persistent location. On start up or leadership change,
* the {@link DagManager} loads all the checkpointed {@link Dag}s and adds
them to the {@link BlockingQueue}.
@@ -105,8 +117,8 @@ public class DagManager extends AbstractIdleService {
private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
private static final Integer DEFAULT_NUM_THREADS = 3;
private static final Integer TERMINATION_TIMEOUT = 30;
- private static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX +
"numThreads";
- private static final String JOB_STATUS_POLLING_INTERVAL_KEY =
DAG_MANAGER_PREFIX + "pollingInterval";
+ public static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX +
"numThreads";
+ public static final String JOB_STATUS_POLLING_INTERVAL_KEY =
DAG_MANAGER_PREFIX + "pollingInterval";
private static final String JOB_STATUS_RETRIEVER_CLASS_KEY =
JOB_STATUS_RETRIEVER_KEY + ".class";
private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS =
FsJobStatusRetriever.class.getName();
private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX +
"dagStateStoreClass";
@@ -136,7 +148,10 @@ public class DagManager extends AbstractIdleService {
}
}
- private BlockingQueue<Dag<JobExecutionPlan>> queue;
+ private BlockingQueue<Dag<JobExecutionPlan>>[] queue;
+ private BlockingQueue<String>[] cancelQueue;
+ DagManagerThread[] dagManagerThreads;
+
private ScheduledExecutorService scheduledExecutorPool;
private boolean instrumentationEnabled;
private DagStateStore dagStateStore;
@@ -144,6 +159,7 @@ public class DagManager extends AbstractIdleService {
private final Integer numThreads;
private final Integer pollingInterval;
+ @Getter
private final JobStatusRetriever jobStatusRetriever;
private final KafkaJobStatusMonitor jobStatusMonitor;
private final Config config;
@@ -153,8 +169,9 @@ public class DagManager extends AbstractIdleService {
public DagManager(Config config, boolean instrumentationEnabled) {
this.config = config;
- this.queue = new LinkedBlockingDeque<>();
this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY,
DEFAULT_NUM_THREADS);
+ this.queue = initializeDagQueue(this.numThreads);
+ this.cancelQueue = initializeDagQueue(this.numThreads);
this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
this.pollingInterval = ConfigUtils.getInt(config,
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
this.instrumentationEnabled = instrumentationEnabled;
@@ -166,15 +183,41 @@ public class DagManager extends AbstractIdleService {
}
try {
- Class jobStatusRetrieverClass =
Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY,
DEFAULT_JOB_STATUS_RETRIEVER_CLASS));
- this.jobStatusMonitor = new
KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
- this.jobStatusRetriever =
- (JobStatusRetriever)
GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass,
config);
+ this.jobStatusMonitor = createJobStatusMonitor(config);
+ this.jobStatusRetriever = createJobStatusRetriever(config);
} catch (ReflectiveOperationException e) {
throw new RuntimeException("Exception encountered during DagManager
initialization", e);
}
}
+ JobStatusRetriever createJobStatusRetriever(Config config) throws
ReflectiveOperationException {
+ Class jobStatusRetrieverClass =
Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY,
DEFAULT_JOB_STATUS_RETRIEVER_CLASS));
+ return (JobStatusRetriever)
GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass,
config);
+ }
+
+ KafkaJobStatusMonitor createJobStatusMonitor(Config config) throws
ReflectiveOperationException {
+ return new KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
+ }
+
+ DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec>
topologySpecMap) {
+ try {
+ Class dagStateStoreClass = Class.forName(ConfigUtils.getString(config,
DAG_STATESTORE_CLASS_KEY, FSDagStateStore.class.getName()));
+ return (DagStateStore)
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config,
topologySpecMap);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // Initializes and returns an array of Queue of size numThreads
+ private static LinkedBlockingDeque[] initializeDagQueue(int numThreads) {
+ LinkedBlockingDeque[] queue = new LinkedBlockingDeque[numThreads];
+
+ for (int i=0; i< numThreads; i++) {
+ queue[i] = new LinkedBlockingDeque<>();
+ }
+ return queue;
+ }
+
public DagManager(Config config) {
this(config, true);
}
@@ -192,14 +235,19 @@ public class DagManager extends AbstractIdleService {
* submitted dag to the {@link DagStateStore} and then adds the dag to a
{@link BlockingQueue} to be picked up
* by one of the {@link DagManagerThread}s.
*/
- synchronized void offer(Dag<JobExecutionPlan> dag) throws IOException {
+ synchronized void addDag(Dag<JobExecutionPlan> dag) throws IOException {
//Persist the dag
this.dagStateStore.writeCheckpoint(dag);
- submitEventsAndSetStatus(dag);
- //Add it to the queue of dags
- if (!this.queue.offer(dag)) {
+ long flowExecutionId = DagManagerUtils.getFlowExecId(dag);
+ int queueId = (int) (flowExecutionId % this.numThreads);
+ // Add the dag to the specific queue determined by flowExecutionId
+ // Flow cancellation request has to be forwarded to the same
DagManagerThread where the
+ // flow create request was forwarded. This is because Azkaban Exec Id is
stored in the DagNode of the
+ // specific DagManagerThread queue
+ if (!this.queue[queueId].offer(dag)) {
throw new IOException("Could not add dag" +
DagManagerUtils.generateDagId(dag) + "to queue");
}
+ submitEventsAndSetStatus(dag);
}
private void submitEventsAndSetStatus(Dag<JobExecutionPlan> dag) {
@@ -213,6 +261,26 @@ public class DagManager extends AbstractIdleService {
}
}
+ /**
+ * Method to submit a {@link URI} for cancellation requsts to the {@link
DagManager}.
+ * The {@link DagManager} adds the dag to the {@link BlockingQueue} to be
picked up by one of the {@link DagManagerThread}s.
+ */
+ synchronized public void stopDag(URI uri) throws IOException {
+ String flowGroup =
FlowConfigResourceLocalHandler.FlowUriUtils.getFlowGroup(uri);
+ String flowName =
FlowConfigResourceLocalHandler.FlowUriUtils.getFlowName(uri);
+
+ List<Long> flowExecutionIds =
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+ log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+ for (long flowExecutionId : flowExecutionIds) {
+ int queueId = (int) (flowExecutionId % this.numThreads);
+ String dagId = DagManagerUtils.generateDagId(flowGroup, flowName,
flowExecutionId);
+ if (!this.cancelQueue[queueId].offer(dagId)) {
+ throw new IOException("Could not add dag " + dagId + " to cancellation
queue.");
+ }
+ }
+ }
+
public synchronized void setTopologySpecMap(Map<URI, TopologySpec>
topologySpecMap) {
this.topologySpecMap = topologySpecMap;
}
@@ -234,20 +302,22 @@ public class DagManager extends AbstractIdleService {
log.info("Activating DagManager.");
log.info("Scheduling {} DagManager threads", numThreads);
//Initializing state store for persisting Dags.
- Class dagStateStoreClass = Class.forName(ConfigUtils.getString(config,
DAG_STATESTORE_CLASS_KEY, FSDagStateStore.class.getName()));
- this.dagStateStore = (DagStateStore)
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config,
topologySpecMap);
+ this.dagStateStore = createDagStateStore(config, topologySpecMap);
//On startup, the service creates DagManagerThreads that are scheduled
at a fixed rate.
+ this.dagManagerThreads = new DagManagerThread[numThreads];
for (int i = 0; i < numThreads; i++) {
- this.scheduledExecutorPool.scheduleAtFixedRate(new
DagManagerThread(jobStatusRetriever, dagStateStore, queue,
instrumentationEnabled), 0, this.pollingInterval,
- TimeUnit.SECONDS);
+ DagManagerThread dagManagerThread = new
DagManagerThread(jobStatusRetriever, dagStateStore,
+ queue[i], cancelQueue[i], instrumentationEnabled);
+ this.dagManagerThreads[i] = dagManagerThread;
+ this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0,
this.pollingInterval, TimeUnit.SECONDS);
}
if ((this.jobStatusMonitor != null) &&
(!this.jobStatusMonitor.isRunning())) {
log.info("Starting job status monitor");
jobStatusMonitor.startAsync().awaitRunning();
}
for (Dag<JobExecutionPlan> dag : dagStateStore.getDags()) {
- offer(dag);
+ addDag(dag);
}
} else { //Mark the DagManager inactive.
log.info("Inactivating the DagManager. Shutting down all DagManager
threads");
@@ -255,12 +325,12 @@ public class DagManager extends AbstractIdleService {
try {
this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
- log.error("Exception {} encountered when shutting down DagManager
threads.", e);
+ log.error("Exception encountered when shutting down DagManager
threads.", e);
}
log.info("Shutting down JobStatusMonitor");
this.jobStatusMonitor.shutDown();
}
- } catch (IOException | ReflectiveOperationException e) {
+ } catch (IOException e) {
log.error("Exception encountered when activating the new DagManager", e);
throw new RuntimeException(e);
}
@@ -277,7 +347,7 @@ public class DagManager extends AbstractIdleService {
public static class DagManagerThread implements Runnable {
private final Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>>
jobToDag = new HashMap<>();
private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
- private final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs
= new HashMap<>();
+ final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new
HashMap<>();
private final Set<String> failedDagIdsFinishRunning = new HashSet<>();
private final Set<String> failedDagIdsFinishAllPossible = new HashSet<>();
private final MetricContext metricContext;
@@ -287,15 +357,17 @@ public class DagManager extends AbstractIdleService {
private JobStatusRetriever jobStatusRetriever;
private DagStateStore dagStateStore;
private BlockingQueue<Dag<JobExecutionPlan>> queue;
+ private BlockingQueue<String> cancelQueue;
/**
* Constructor.
*/
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore
dagStateStore,
- BlockingQueue<Dag<JobExecutionPlan>> queue, boolean
instrumentationEnabled) {
+ BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String>
cancelQueue, boolean instrumentationEnabled) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
this.queue = queue;
+ this.cancelQueue = cancelQueue;
if (instrumentationEnabled) {
this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
this.eventSubmitter = Optional.of(new
EventSubmitter.Builder(this.metricContext,
"org.apache.gobblin.service").build());
@@ -314,10 +386,15 @@ public class DagManager extends AbstractIdleService {
@Override
public void run() {
try {
- Object nextItem = queue.poll();
+ String nextDagToCancel = cancelQueue.poll();
+ //Poll the cancelQueue for a new Dag to cancel.
+ if (nextDagToCancel != null) {
+ cancelDag(nextDagToCancel);
+ }
+
+ Dag<JobExecutionPlan> dag = queue.poll();
//Poll the queue for a new Dag to execute.
- if (nextItem != null) {
- Dag<JobExecutionPlan> dag = (Dag<JobExecutionPlan>) nextItem;
+ if (dag != null) {
if (dag.isEmpty()) {
log.info("Empty dag; ignoring the dag");
}
@@ -337,6 +414,30 @@ public class DagManager extends AbstractIdleService {
}
}
+ private void cancelDag(String dagToCancel) throws ExecutionException,
InterruptedException {
+ log.info("Cancel flow with DagId {}", dagToCancel);
+ if (this.dagToJobs.containsKey(dagToCancel)) {
+ List<DagNode<JobExecutionPlan>> dagNodesToCancel =
this.dagToJobs.get(dagToCancel);
+ log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size());
+ for (DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
+ Properties props = new Properties();
+ if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+ Future future = dagNodeToCancel.getValue().getJobFuture().get();
+ if (future instanceof CompletableFuture &&
+ future.get() instanceof AzkabanExecuteFlowStatus.ExecuteId) {
+ CompletableFuture<AzkabanExecuteFlowStatus.ExecuteId>
completableFuture = (CompletableFuture) future;
+ String azkabanExecId = completableFuture.get().getExecId();
+ props.put(ConfigurationKeys.AZKABAN_EXEC_ID, azkabanExecId);
+ log.info("Cancel job with azkaban exec id {}.", azkabanExecId);
+ }
+ }
+ DagManagerUtils.getSpecProducer(dagNodeToCancel).deleteSpec(null,
props);
+ }
+ } else {
+ log.warn("Did not find Dag with id {}, it might be already
cancelled.", dagToCancel);
+ }
+ }
+
/**
* This method determines the next set of jobs to execute from the dag and
submits them for execution.
* This method updates internal data structures tracking currently running
Dags and jobs.
@@ -450,8 +551,8 @@ public class DagManager extends AbstractIdleService {
}
/**
- * Obtain next dag
- * @param dagId The dagId that has been processed.
+ * Submit next set of Dag nodes in the Dag identified by the provided dagId
+ * @param dagId The dagId that should be processed.
* @return
* @throws IOException
*/
@@ -493,7 +594,9 @@ public class DagManager extends AbstractIdleService {
// The SpecProducer implementations submit the job to the underlying
executor and return when the submission is complete,
// either successfully or unsuccessfully. To catch any exceptions in
the job submission, the DagManagerThread
// blocks (by calling Future#get()) until the submission is completed.
- producer.addSpec(jobSpec).get();
+ Future addSpecFuture = producer.addSpec(jobSpec);
+ dagNode.getValue().setJobFuture(Optional.of(addSpecFuture));
+
if (this.metricContext != null) {
getRunningJobsCounter(dagNode).inc();
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 110ae0e..46fc59a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -58,9 +58,23 @@ public class DagManagerUtils {
* @return a String id associated corresponding to the {@link Dag} instance.
*/
static String generateDagId(Dag<JobExecutionPlan> dag) {
- FlowId flowId = getFlowId(dag);
- Long flowExecutionId = getFlowExecId(dag);
- return Joiner.on("_").join(flowId.getFlowGroup(), flowId.getFlowName(),
flowExecutionId);
+ return
generateDagId(dag.getStartNodes().get(0).getValue().getJobSpec().getConfig());
+ }
+
+ static String generateDagId(Dag.DagNode<JobExecutionPlan> dagNode) {
+ return generateDagId(dagNode.getValue().getJobSpec().getConfig());
+ }
+
+ private static String generateDagId(Config jobConfig) {
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+ return generateDagId(flowGroup, flowName, flowExecutionId);
+ }
+
+ static String generateDagId(String flowGroup, String flowName, long
flowExecutionId) {
+ return Joiner.on("_").join(flowGroup, flowName, flowExecutionId);
}
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index d8d5480..f987682 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.modules.orchestration;
+import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Collections;
@@ -283,7 +284,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
if (this.dagManager.isPresent()) {
//Send the dag to the DagManager.
- this.dagManager.get().offer(jobExecutionPlanDag);
+ this.dagManager.get().addDag(jobExecutionPlanDag);
} else {
// Schedule all compiled JobSpecs on their respective Executor
for (Dag.DagNode<JobExecutionPlan> dagNode :
jobExecutionPlanDag.getNodes()) {
@@ -341,33 +342,17 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
}
}
- public void remove(Spec spec, Properties headers) {
+ public void remove(Spec spec, Properties headers) throws IOException {
// TODO: Evolve logic to cache and reuse previously compiled JobSpecs
// .. this will work for Identity compiler but not always for multi-hop.
// Note: Current logic assumes compilation is consistent between all
executions
if (spec instanceof FlowSpec) {
- Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(spec);
-
- if (jobExecutionPlanDag.isEmpty()) {
- _log.warn("Cannot determine an executor to delete Spec: " + spec);
- return;
- }
-
- // Delete all compiled JobSpecs on their respective Executor
- for (Dag.DagNode<JobExecutionPlan> dagNode:
jobExecutionPlanDag.getNodes()) {
- JobExecutionPlan jobExecutionPlan = dagNode.getValue();
- // Delete this spec on selected executor
- SpecProducer producer = null;
- try {
- producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
- Spec jobSpec = jobExecutionPlan.getJobSpec();
-
- _log.info(String.format("Going to delete JobSpec: %s on Executor:
%s", jobSpec, producer));
- producer.deleteSpec(jobSpec.getUri(), headers);
- } catch (Exception e) {
- _log.error("Cannot successfully delete spec: " +
jobExecutionPlan.getJobSpec() + " on executor: " + producer
- + " for flow: " + spec, e);
- }
+ if (this.dagManager.isPresent()) {
+ //Send the dag to the DagManager.
+ _log.info("Forwarding cancel request for flow URI {} to DagManager.",
spec.getUri());
+ this.dagManager.get().stopDag(spec.getUri());
+ } else {
+ _log.warn("Operation not supported.");
}
} else {
throw new RuntimeException("Spec not of type FlowSpec, cannot delete: "
+ spec);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index a691811..baac7d6 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -349,7 +349,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
"Spec with URI: %s was not found in cache. May be it was cleaned,
if not please " + "clean it manually",
deletedSpecURI));
}
- } catch (JobException e) {
+ } catch (JobException | IOException e) {
_log.warn(String.format("Spec with URI: %s was not unscheduled
cleaning", deletedSpecURI), e);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index acdafea..27d76cb 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -19,10 +19,12 @@ package org.apache.gobblin.service.modules.spec;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -52,7 +54,7 @@ import org.apache.gobblin.util.ConfigUtils;
* where the {@link JobSpec} will be executed.
*/
@Data
-@EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts"})
+@EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts",
"jobFuture"})
public class JobExecutionPlan {
public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
@@ -61,6 +63,7 @@ public class JobExecutionPlan {
private ExecutionStatus executionStatus = ExecutionStatus.PENDING;
private final int maxAttempts;
private int currentAttempts = 0;
+ private Optional<Future> jobFuture = Optional.absent();
public static class Factory {
public static final String JOB_NAME_COMPONENT_SEPARATION_CHAR = "_";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
index 11b5e3f..1f988c2 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
@@ -23,7 +23,10 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import com.google.common.base.Optional;
import com.google.gson.JsonArray;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
@@ -107,6 +110,17 @@ public class JobExecutionPlanListDeserializer implements
JsonDeserializer<List<J
JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(jobSpec,
specExecutor);
jobExecutionPlan.setExecutionStatus(executionStatus);
+
+ try {
+ String jobExecutionFuture =
serializedJobExecutionPlan.get(SerializationConstants.JOB_EXECUTION_FUTURE).getAsString();
+ Future future =
specExecutor.getProducer().get().deserializeAddSpecResponse(jobExecutionFuture);
+ jobExecutionPlan.setJobFuture(Optional.fromNullable(future));
+
+ } catch (ExecutionException | InterruptedException e) {
+ log.warn("Error during deserialization of JobExecutionFuture.");
+ throw new RuntimeException(e);
+ }
+
jobExecutionPlans.add(jobExecutionPlan);
}
return jobExecutionPlans;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
index 3de0209..5a38c19 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
@@ -83,6 +83,15 @@ public class JobExecutionPlanListSerializer implements
JsonSerializer<List<JobEx
String executionStatus = jobExecutionPlan.getExecutionStatus().name();
jobExecutionPlanJson.addProperty(SerializationConstants.EXECUTION_STATUS_KEY,
executionStatus);
+ try {
+ String jobExecutionFuture =
jobExecutionPlan.getSpecExecutor().getProducer().get()
+
.serializeAddSpecResponse(jobExecutionPlan.getJobFuture().orNull());
+
jobExecutionPlanJson.addProperty(SerializationConstants.JOB_EXECUTION_FUTURE,
jobExecutionFuture);
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Error during serialization of JobExecutionFuture.");
+ throw new RuntimeException(e);
+ }
+
jsonArray.add(jobExecutionPlanJson);
}
return jsonArray;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
index aa8ea9e..5359465 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
@@ -31,4 +31,5 @@ public class SerializationConstants {
public static final String SPEC_EXECUTOR_URI_KEY = "uri";
public static final String EXECUTION_STATUS_KEY = "executionStatus";
+ public static final String JOB_EXECUTION_FUTURE = "jobExecutionFuture";
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
new file mode 100644
index 0000000..1300458
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicate;
+import com.typesafe.config.Config;
+
+import javax.annotation.Nullable;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import org.apache.gobblin.testing.AssertWithBackoff;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class DagManagerFlowTest {
+ DagManager dagManager;
+
+ @BeforeClass
+ public void setUp() {
+ Properties props = new Properties();
+ props.put(DagManager.JOB_STATUS_POLLING_INTERVAL_KEY, 1);
+ dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props),
false);
+ dagManager.setActive(true);
+ }
+
+ @Test
+ void testAddDeleteSpec() throws Exception {
+ Dag<JobExecutionPlan> dag1 = DagManagerTest.buildDag("0", 123456780L,
"FINISH_RUNNING", 1);
+ Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("1", 123456781L,
"FINISH_RUNNING", 1);
+ Dag<JobExecutionPlan> dag3 = DagManagerTest.buildDag("2", 123456782L,
"FINISH_RUNNING", 1);
+
+ // mock add spec
+ dagManager.addDag(dag1);
+ dagManager.addDag(dag2);
+ dagManager.addDag(dag3);
+
+ // check existence of dag in dagToJobs map
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+ assertTrue(input ->
dagManager.dagManagerThreads[0].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag1)),
"Waiting for the map to update");
+ AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
+ assertTrue(input ->
dagManager.dagManagerThreads[1].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag2)),
"Waiting for the map to update");
+ AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
+ assertTrue(input ->
dagManager.dagManagerThreads[2].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag3)),
"Waiting for the map to update");
+
+ // mock delete spec
+
dagManager.stopDag(FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(new
FlowId().setFlowGroup("group0").setFlowName("flow0")));
+
dagManager.stopDag(FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(new
FlowId().setFlowGroup("group1").setFlowName("flow1")));
+
dagManager.stopDag(FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(new
FlowId().setFlowGroup("group2").setFlowName("flow2")));
+
+ // verify deleteSpec() of specProducer is called once
+
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new
DeletePredicate(dag1), "Waiting for the map to update");
+
AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new
DeletePredicate(dag2), "Waiting for the map to update");
+
AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new
DeletePredicate(dag3), "Waiting for the map to update");
+
+ // mock flow cancellation tracking event
+ Mockito.doReturn(DagManagerTest.getMockJobStatus("flow0", "group0",
123456780L, "group0", "job0", String.valueOf(
+ ExecutionStatus.CANCELLED)))
+
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow0",
"group0", 123456780L, "job0", "group0");
+
+ Mockito.doReturn(DagManagerTest.getMockJobStatus("flow1", "group1",
123456781L, "group1", "job0", String.valueOf(
+ ExecutionStatus.CANCELLED)))
+
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow1",
"group1", 123456781L, "job0", "group1");
+
+ Mockito.doReturn(DagManagerTest.getMockJobStatus("flow2", "group2",
123456782L, "group2", "job0", String.valueOf(
+ ExecutionStatus.CANCELLED)))
+
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow2",
"group2", 123456782L, "job0", "group2");
+
+ // check removal of dag in dagToJobs map
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+ assertTrue(input ->
!dagManager.dagManagerThreads[0].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag1)),
"Waiting for the map to update");
+ AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
+ assertTrue(input ->
!dagManager.dagManagerThreads[1].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag2)),
"Waiting for the map to update");
+ AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
+ assertTrue(input ->
!dagManager.dagManagerThreads[2].dagToJobs.containsKey(DagManagerUtils.generateDagId(dag3)),
"Waiting for the map to update");
+ }
+}
+
+class DeletePredicate implements Predicate {
+ private final Dag<JobExecutionPlan> dag;
+ public DeletePredicate(Dag<JobExecutionPlan> dag) {
+ this.dag = dag;
+ }
+
+ @Override
+ public boolean apply(@Nullable Object input) {
+ try {
+
verify(dag.getNodes().get(0).getValue().getSpecExecutor().getProducer().get()).deleteSpec(any(),
any());
+ } catch (Throwable e) {
+ return false;
+ }
+ return true;
+ }
+}
+
+class MockedDagManager extends DagManager {
+
+ public MockedDagManager(Config config, boolean instrumentationEnabled) {
+ super(config, instrumentationEnabled);
+ }
+
+ @Override
+ JobStatusRetriever createJobStatusRetriever(Config config) {
+ JobStatusRetriever mockedJbStatusRetriever =
Mockito.mock(JobStatusRetriever.class);
+
Mockito.doReturn(Collections.emptyIterator()).when(mockedJbStatusRetriever).getJobStatusesForFlowExecution(anyString(),
anyString(), anyLong(), anyString(), anyString());
+ when(mockedJbStatusRetriever.getLatestExecutionIdsForFlow(eq("flow0"),
eq("group0"), anyInt())).thenReturn(Collections.singletonList(123456780L));
+ when(mockedJbStatusRetriever.getLatestExecutionIdsForFlow(eq("flow1"),
eq("group1"), anyInt())).thenReturn(Collections.singletonList(123456781L));
+ when(mockedJbStatusRetriever.getLatestExecutionIdsForFlow(eq("flow2"),
eq("group2"), anyInt())).thenReturn(Collections.singletonList(123456782L));
+ return mockedJbStatusRetriever;
+ }
+
+ @Override
+ KafkaJobStatusMonitor createJobStatusMonitor(Config config) {
+ return null;
+ }
+
+ @Override
+ DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec>
topologySpecMap) {
+ DagStateStore mockedDagStateStore = Mockito.mock(DagStateStore.class);
+
+ try {
+ doNothing().when(mockedDagStateStore).writeCheckpoint(any());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return mockedDagStateStore;
+ }
+}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 0165bb2..c250a49 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -46,7 +46,7 @@ import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
@@ -62,6 +62,7 @@ public class DagManagerTest {
private JobStatusRetriever _jobStatusRetriever;
private DagManager.DagManagerThread _dagManagerThread;
private LinkedBlockingQueue<Dag<JobExecutionPlan>> queue;
+ private LinkedBlockingQueue<String> cancelQueue;
private Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag;
private Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs;
private Map<String, Dag<JobExecutionPlan>> dags;
@@ -75,7 +76,9 @@ public class DagManagerTest {
this._dagStateStore = new FSDagStateStore(config, new HashMap<>());
this._jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
this.queue = new LinkedBlockingQueue<>();
- this._dagManagerThread = new
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, queue, true);
+ this.cancelQueue = new LinkedBlockingQueue<>();
+ this._dagManagerThread = new
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, queue,
cancelQueue,
+ true);
Field jobToDagField =
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
jobToDagField.setAccessible(true);
@@ -95,10 +98,16 @@ public class DagManagerTest {
* Create a {@link Dag <JobExecutionPlan>}.
* @return a Dag.
*/
- private Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId,
String flowFailureOption, boolean flag)
+ static Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId,
String flowFailureOption, boolean flag)
throws URISyntaxException {
- List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
int numNodes = (flag) ? 3 : 5;
+ return buildDag(id, flowExecutionId, flowFailureOption, numNodes);
+ }
+
+ static Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId,
String flowFailureOption, int numNodes)
+ throws URISyntaxException {
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
for (int i = 0; i < numNodes; i++) {
String suffix = Integer.toString(i);
Config jobConfig = ConfigBuilder.create().
@@ -117,18 +126,18 @@ public class DagManagerTest {
}
JobSpec js = JobSpec.builder("test_job" +
suffix).withVersion(suffix).withConfig(jobConfig).
withTemplate(new URI("job" + suffix)).build();
- SpecExecutor specExecutor =
InMemorySpecExecutor.createDummySpecExecutor(new URI("job" + i));
+ SpecExecutor specExecutor =
MockedSpecExecutor.createDummySpecExecutor(new URI("job" + i));
JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js,
specExecutor);
jobExecutionPlans.add(jobExecutionPlan);
}
return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
}
- private Iterator<JobStatus> getMockJobStatus(String flowName, String
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String
eventName) {
+ static Iterator<JobStatus> getMockJobStatus(String flowName, String
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String
eventName) {
return getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup,
jobName, eventName, false);
}
- private Iterator<JobStatus> getMockJobStatus(String flowName, String
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String
eventName, boolean shouldRetry) {
+ private static Iterator<JobStatus> getMockJobStatus(String flowName, String
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String
eventName, boolean shouldRetry) {
return
Iterators.singletonIterator(JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(jobGroup).jobName(jobName).flowExecutionId(flowExecutionId).
message("Test
message").eventName(eventName).startTime(5000L).shouldRetry(shouldRetry).build());
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java
index f63c6ad..e87c4d6 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java
@@ -23,6 +23,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.JobSpec;
@@ -33,11 +39,8 @@ import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.util.CompletedFuture;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.hadoop.fs.Path;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
public class DagTestUtils {
@@ -86,6 +89,11 @@ public class DagTestUtils {
SpecExecutor specExecutor =
buildNaiveTopologySpec("mySpecExecutor").getSpecExecutor();
JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js,
specExecutor);
jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+
+ // Future of type CompletedFuture is used because in tests
InMemorySpecProducer is used and that responds with CompletedFuture
+ CompletedFuture future = new CompletedFuture<>(Boolean.TRUE, null);
+ jobExecutionPlan.setJobFuture(Optional.of(future));
+
jobExecutionPlans.add(jobExecutionPlan);
}
return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
index a38929a..b2fa5f5 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
@@ -110,7 +111,7 @@ public class FSDagStateStoreTest {
}
@Test (dependsOnMethods = "testCleanUp")
- public void testGetDags() throws IOException, URISyntaxException {
+ public void testGetDags() throws IOException, URISyntaxException,
ExecutionException, InterruptedException {
//Set up a new FSDagStateStore instance.
setUp();
List<Long> flowExecutionIds =
Lists.newArrayList(System.currentTimeMillis(), System.currentTimeMillis() + 1);
@@ -129,6 +130,8 @@ public class FSDagStateStoreTest {
Assert.assertEquals(dag.getParentChildMap().size(), 1);
Assert.assertEquals(dag.getNodes().get(0).getValue().getSpecExecutor().getUri(),
specExecURI);
Assert.assertEquals(dag.getNodes().get(1).getValue().getSpecExecutor().getUri(),
specExecURI);
+
Assert.assertTrue(Boolean.parseBoolean(dag.getNodes().get(0).getValue().getJobFuture().get().get().toString()));
+
Assert.assertTrue(Boolean.parseBoolean(dag.getNodes().get(1).getValue().getJobFuture().get().get().toString()));
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
index 6ad85d6..a4d39e6 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreTest.java
@@ -23,6 +23,12 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.dbcp.BasicDataSource;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
@@ -34,11 +40,6 @@ import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.typesafe.config.Config;
/**
@@ -97,6 +98,8 @@ public class MysqlDagStateStoreTest {
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY),
"flow" + "random_0");
Assert.assertEquals(jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
123L);
Assert.assertEquals(plan.getExecutionStatus(), ExecutionStatus.RUNNING);
+
Assert.assertTrue(Boolean.parseBoolean(plan.getJobFuture().get().get().toString()));
+
Assert.assertTrue(Boolean.parseBoolean(plan.getJobFuture().get().get().toString()));
}
dagDeserialized = dags.get(1);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
index ad48f34..14668ee 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
@@ -28,7 +28,6 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;