This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0b253d5 [GOBBLIN-687] Pass TopologySpec map to DagManager to allow
reuse of Sp…
0b253d5 is described below
commit 0b253d5b834e4bc2eb3ca115919a4bdf50ddc52c
Author: suvasude <[email protected]>
AuthorDate: Wed Feb 20 15:01:13 2019 -0800
[GOBBLIN-687] Pass TopologySpec map to DagManager to allow reuse of Sp…
Closes #2559 from sv2000/dagManagerTopologySpecMap
---
.../modules/core/GobblinServiceManager.java | 40 ++++++++++------------
.../service/modules/orchestration/DagManager.java | 31 +++++++++++------
.../modules/orchestration/FSDagStateStore.java | 9 ++---
.../modules/orchestration/Orchestrator.java | 30 ++++++++--------
.../spec/JobExecutionPlanListDeserializer.java | 18 ++++++----
.../modules/spec/SerializationConstants.java | 1 +
.../modules/orchestration/DagManagerTest.java | 4 ++-
.../modules/orchestration/FSDagStateStoreTest.java | 39 +++++++++++++++++++--
8 files changed, 113 insertions(+), 59 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 3e3429b..d96d6d2 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -188,24 +188,14 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
this.isGitConfigMonitorEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY,
false);
- this.isDagManagerEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, false);
-
if (this.isGitConfigMonitorEnabled) {
this.gitConfigMonitor = new GitConfigMonitor(config, this.flowCatalog);
this.serviceLauncher.addService(this.gitConfigMonitor);
}
-
- if (this.isDagManagerEnabled) {
- this.dagManager = new DagManager(config);
- this.serviceLauncher.addService(this.dagManager);
- }
} else {
this.isGitConfigMonitorEnabled = false;
- this.isDagManagerEnabled = false;
}
-
-
// Initialize Helix
Optional<String> zkConnectionString =
Optional.fromNullable(ConfigUtils.getString(config,
ServiceConfigKeys.ZK_CONNECTION_STRING_KEY, null));
@@ -218,6 +208,13 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
this.helixManager = Optional.absent();
}
+ this.isDagManagerEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, false);
+ // Initialize DagManager
+ if (this.isDagManagerEnabled) {
+ this.dagManager = new DagManager(config);
+ this.serviceLauncher.addService(this.dagManager);
+ }
+
// Initialize ServiceScheduler
this.isSchedulerEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY, true);
@@ -339,7 +336,7 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
* Handle leadership change.
* @param changeContext notification context
*/
- private void handleLeadershipChange(NotificationContext changeContext) {
+ private void handleLeadershipChange(NotificationContext changeContext) {
if (this.helixManager.isPresent() && this.helixManager.get().isLeader()) {
LOGGER.info("Leader notification for {} HM.isLeader {}",
this.helixManager.get().getInstanceName(),
this.helixManager.get().isLeader());
@@ -354,7 +351,10 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
}
if (this.isDagManagerEnabled) {
- this.dagManager.setActive(true);
+ //Activate DagManager only if TopologyCatalog is initialized. If not;
skip activation.
+ if (this.topologyCatalog.getInitComplete().getCount() == 0) {
+ this.dagManager.setActive(true);
+ }
}
} else if (this.helixManager.isPresent()) {
LOGGER.info("Leader lost notification for {} HM.isLeader {}",
this.helixManager.get().getInstanceName(),
@@ -405,10 +405,6 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
this.gitConfigMonitor.setActive(true);
}
- if (this.isDagManagerEnabled) {
- this.dagManager.setActive(true);
- }
-
} else {
if (this.isSchedulerEnabled) {
LOGGER.info("[Init] Gobblin Service is running in slave instance
mode, not enabling Scheduler.");
@@ -423,11 +419,6 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
if (this.isGitConfigMonitorEnabled) {
this.gitConfigMonitor.setActive(true);
}
-
- if (this.isDagManagerEnabled) {
- this.dagManager.setActive(true);
- }
-
}
// Populate TopologyCatalog with all Topologies generated by
TopologySpecFactory
@@ -450,6 +441,13 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
//Activate the SpecCompiler, after the topologyCatalog has been
initialized.
this.orchestrator.getSpecCompiler().setActive(true);
+
+ //Activate the DagManager service, after the topologyCatalog has been
initialized.
+ if (!this.helixManager.isPresent() || this.helixManager.get().isLeader()){
+ if (this.isDagManagerEnabled) {
+ this.dagManager.setActive(true);
+ }
+ }
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 04a4c4e..3a7c232 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -51,6 +52,7 @@ import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceMetricNames;
import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -134,16 +136,19 @@ public class DagManager extends AbstractIdleService {
private BlockingQueue<Dag<JobExecutionPlan>> queue;
private ScheduledExecutorService scheduledExecutorPool;
private boolean instrumentationEnabled;
+ private DagStateStore dagStateStore;
+ private Map<URI, TopologySpec> topologySpecMap;
private final Integer numThreads;
private final Integer pollingInterval;
private final JobStatusRetriever jobStatusRetriever;
private final KafkaJobStatusMonitor jobStatusMonitor;
- private final DagStateStore dagStateStore;
+ private final Config config;
private volatile boolean isActive = false;
public DagManager(Config config, boolean instrumentationEnabled) {
+ this.config = config;
this.queue = new LinkedBlockingDeque<>();
this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY,
DEFAULT_NUM_THREADS);
this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
@@ -162,10 +167,7 @@ public class DagManager extends AbstractIdleService {
this.jobStatusMonitor = null;
}
this.jobStatusRetriever =
- (JobStatusRetriever)
GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass,
- ConfigUtils.getConfigOrEmpty(config, JOB_STATUS_RETRIEVER_KEY));
- Class dagStateStoreClass =
Class.forName(config.getString(DAG_STATESTORE_CLASS_KEY));
- this.dagStateStore = (DagStateStore)
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config);
+ (JobStatusRetriever)
GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass,
config);
} catch (ReflectiveOperationException e) {
throw new RuntimeException("Exception encountered during DagManager
initialization", e);
}
@@ -197,6 +199,10 @@ public class DagManager extends AbstractIdleService {
}
}
+ public synchronized void setTopologySpecMap(Map<URI, TopologySpec>
topologySpecMap) {
+ this.topologySpecMap = topologySpecMap;
+ }
+
/**
* When a {@link DagManager} becomes active, it loads the serialized
representations of the currently running {@link Dag}s
* from the checkpoint directory, deserializes the {@link Dag}s and adds
them to a queue to be consumed by
@@ -211,7 +217,12 @@ public class DagManager extends AbstractIdleService {
this.isActive = active;
try {
if (this.isActive) {
+ log.info("Activating DagManager.");
log.info("Scheduling {} DagManager threads", numThreads);
+ //Initializing state store for persisting Dags.
+ Class dagStateStoreClass = Class.forName(ConfigUtils.getString(config,
DAG_STATESTORE_CLASS_KEY, FSDagStateStore.class.getName()));
+ this.dagStateStore = (DagStateStore)
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config,
topologySpecMap);
+
//On startup, the service creates DagManagerThreads that are scheduled
at a fixed rate.
for (int i = 0; i < numThreads; i++) {
this.scheduledExecutorPool.scheduleAtFixedRate(new
DagManagerThread(jobStatusRetriever, dagStateStore, queue,
instrumentationEnabled), 0, this.pollingInterval,
@@ -235,7 +246,7 @@ public class DagManager extends AbstractIdleService {
log.info("Shutting down JobStatusMonitor");
this.jobStatusMonitor.shutDown();
}
- } catch (IOException e) {
+ } catch (IOException | ReflectiveOperationException e) {
log.error("Exception encountered when activating the new DagManager", e);
throw new RuntimeException(e);
}
@@ -300,14 +311,14 @@ public class DagManager extends AbstractIdleService {
//Initialize dag.
initialize(dag);
}
- log.info("Polling job statuses..");
+ log.debug("Polling job statuses..");
//Poll and update the job statuses of running jobs.
pollJobStatuses();
- log.info("Poll done.");
+ log.debug("Poll done.");
//Clean up any finished dags
- log.info("Cleaning up finished dags..");
+ log.debug("Cleaning up finished dags..");
cleanUp();
- log.info("Clean up done");
+ log.debug("Clean up done");
} catch (Exception e) {
log.error("Exception encountered in {}", getClass().getName(), e);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index e647fed..5473c8e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -20,7 +20,9 @@ package org.apache.gobblin.service.modules.orchestration;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Type;
+import java.net.URI;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
@@ -37,6 +39,7 @@ import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
@@ -48,8 +51,6 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
@Slf4j
public class FSDagStateStore implements DagStateStore {
public static final String DAG_FILE_EXTENSION = ".dag";
- /** Use gson for ser/de */
- //private static final Gson GSON =
GsonInterfaceAdapter.getGson(Object.class);
/** Type token for ser/de JobExecutionPlan list */
private static final Type LIST_JOBEXECUTIONPLAN_TYPE = new
TypeToken<List<JobExecutionPlan>>(){}.getType();
@@ -57,7 +58,7 @@ public class FSDagStateStore implements DagStateStore {
private final String dagCheckpointDir;
private final Gson gson;
- public FSDagStateStore(Config config) throws IOException {
+ public FSDagStateStore(Config config, Map<URI, TopologySpec>
topologySpecMap) throws IOException {
this.dagCheckpointDir = config.getString(DagManager.DAG_STATESTORE_DIR);
File checkpointDir = new File(this.dagCheckpointDir);
if (!checkpointDir.exists()) {
@@ -67,7 +68,7 @@ public class FSDagStateStore implements DagStateStore {
}
JsonSerializer<List<JobExecutionPlan>> serializer = new
JobExecutionPlanListSerializer();
- JsonDeserializer<List<JobExecutionPlan>> deserializer = new
JobExecutionPlanListDeserializer();
+ JsonDeserializer<List<JobExecutionPlan>> deserializer = new
JobExecutionPlanListDeserializer(topologySpecMap);
this.gson = new
GsonBuilder().registerTypeAdapter(LIST_JOBEXECUTIONPLAN_TYPE, serializer)
.registerTypeAdapter(LIST_JOBEXECUTIONPLAN_TYPE,
deserializer).create();
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index fbc38e6..3b838fe 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -25,8 +25,9 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Maps;
-import javax.annotation.Nonnull;
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
@@ -34,36 +35,32 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.typesafe.config.Config;
+import javax.annotation.Nonnull;
+import lombok.Getter;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumentable;
+import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.ServiceMetricNames;
-import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
-import org.slf4j.LoggerFactory;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecProducer;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.gobblin.configuration.State;
-import org.slf4j.Logger;
-
-import lombok.Getter;
/**
@@ -108,6 +105,11 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
throw new RuntimeException(e);
}
+ //At this point, the TopologySpecMap is initialized by the SpecCompiler.
Pass the TopologySpecMap to the DagManager.
+ if (this.dagManager.isPresent()) {
+
this.dagManager.get().setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
+ }
+
if (instrumentationEnabled) {
this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(config),
this.specCompiler.getClass());
this.flowOrchestrationSuccessFulMeter =
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER));
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
index 0ab13ba..11b5e3f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
@@ -22,6 +22,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import com.google.gson.JsonArray;
import com.google.gson.JsonDeserializationContext;
@@ -36,12 +37,18 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@Slf4j
public class JobExecutionPlanListDeserializer implements
JsonDeserializer<List<JobExecutionPlan>> {
+ private final Map<URI,TopologySpec> topologySpecMap;
+
+ public JobExecutionPlanListDeserializer(Map<URI, TopologySpec>
topologySpecMap) {
+ this.topologySpecMap = topologySpecMap;
+ }
+
/**
* Gson invokes this call-back method during deserialization when it
encounters a field of the
* specified type.
@@ -89,13 +96,12 @@ public class JobExecutionPlanListDeserializer implements
JsonDeserializer<List<J
}
Config specExecutorConfig =
ConfigFactory.parseString(specExecutorJson.get(SerializationConstants.SPEC_EXECUTOR_CONFIG_KEY).getAsString());
- String className =
specExecutorJson.get(SerializationConstants.SPEC_EXECUTOR_CLASS_KEY).getAsString();
SpecExecutor specExecutor;
try {
- specExecutor =
- (SpecExecutor)
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(className),
specExecutorConfig);
- } catch (ReflectiveOperationException e) {
- log.error("Error deserializing specExecuor {}", specExecutorConfig);
+ URI specExecutorUri = new
URI(specExecutorConfig.getString(SerializationConstants.SPEC_EXECUTOR_URI_KEY));
+ specExecutor =
this.topologySpecMap.get(specExecutorUri).getSpecExecutor();
+ } catch (Exception e) {
+ log.error("Error deserializing specExecutor {}", specExecutorConfig);
throw new RuntimeException(e);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
index ecffad9..aa8ea9e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
@@ -28,6 +28,7 @@ public class SerializationConstants {
public static final String SPEC_EXECUTOR_CONFIG_KEY = "config";
public static final String SPEC_EXECUTOR_CLASS_KEY = "class";
public static final String SPEC_EXECUTOR_KEY = "specExecutor";
+ public static final String SPEC_EXECUTOR_URI_KEY = "uri";
public static final String EXECUTION_STATUS_KEY = "executionStatus";
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 620c2ee..467ac84 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -70,7 +71,8 @@ public class DagManagerTest {
FileUtils.deleteDirectory(new File(this.dagStateStoreDir));
Config config = ConfigFactory.empty()
.withValue(DagManager.DAG_STATESTORE_DIR,
ConfigValueFactory.fromAnyRef(this.dagStateStoreDir));
- this._dagStateStore = new FSDagStateStore(config);
+
+ this._dagStateStore = new FSDagStateStore(config, new HashMap<>());
this._jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
this.queue = new LinkedBlockingQueue<>();
this._dagManagerThread = new
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, queue, true);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
index 073d091..8611b66 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
@@ -21,9 +21,13 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -38,25 +42,52 @@ import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.util.ConfigUtils;
public class FSDagStateStoreTest {
private DagStateStore _dagStateStore;
private final String dagStateStoreDir =
"/tmp/fsDagStateStoreTest/dagStateStore";
private File checkpointDir;
+ private Map<URI, TopologySpec> topologySpecMap;
+ private TopologySpec topologySpec;
+ private URI specExecURI;
@BeforeClass
- public void setUp() throws IOException {
+ public void setUp()
+ throws IOException, URISyntaxException {
this.checkpointDir = new File(dagStateStoreDir);
FileUtils.deleteDirectory(this.checkpointDir);
Config config =
ConfigFactory.empty().withValue(DagManager.DAG_STATESTORE_DIR,
ConfigValueFactory.fromAnyRef(
this.dagStateStoreDir));
- this._dagStateStore = new FSDagStateStore(config);
+ this.topologySpecMap = new HashMap<>();
+
+ String specStoreDir = "/tmp/specStoreDir";
+ String specExecInstance = "mySpecExecutor";
+ Properties properties = new Properties();
+ properties.put("specStore.fs.dir", specStoreDir);
+ properties.put("specExecInstance.capabilities", "source:destination");
+ properties.put("specExecInstance.uri", specExecInstance);
+ properties.put("uri",specExecInstance);
+
+ Config specExecConfig = ConfigUtils.propertiesToConfig(properties);
+ SpecExecutor specExecutorInstanceProducer = new
InMemorySpecExecutor(specExecConfig);
+ TopologySpec.Builder topologySpecBuilder = TopologySpec.builder(new
Path(specStoreDir).toUri())
+ .withConfig(specExecConfig)
+ .withDescription("test")
+ .withVersion("1")
+ .withSpecExecutor(specExecutorInstanceProducer);
+ this.topologySpec = topologySpecBuilder.build();
+ this.specExecURI = new URI(specExecInstance);
+ this.topologySpecMap.put(this.specExecURI, topologySpec);
+
+ this._dagStateStore = new FSDagStateStore(config, this.topologySpecMap);
}
/**
@@ -77,7 +108,7 @@ public class FSDagStateStoreTest {
}
JobSpec js = JobSpec.builder("test_job" +
suffix).withVersion(suffix).withConfig(jobConfig).
withTemplate(new URI("job" + suffix)).build();
- SpecExecutor specExecutor = new
InMemorySpecExecutor(ConfigFactory.empty());
+ SpecExecutor specExecutor = this.topologySpec.getSpecExecutor();
JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js,
specExecutor);
jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
jobExecutionPlans.add(jobExecutionPlan);
@@ -144,6 +175,8 @@ public class FSDagStateStoreTest {
Assert.assertEquals(dag.getStartNodes().size(), 1);
Assert.assertEquals(dag.getEndNodes().size(), 1);
Assert.assertEquals(dag.getParentChildMap().size(), 1);
+
Assert.assertEquals(dag.getNodes().get(0).getValue().getSpecExecutor().getUri(),
specExecURI);
+
Assert.assertEquals(dag.getNodes().get(1).getValue().getSpecExecutor().getUri(),
specExecURI);
}
}