This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b253d5  [GOBBLIN-687] Pass TopologySpec map to DagManager to allow 
reuse of Sp…
0b253d5 is described below

commit 0b253d5b834e4bc2eb3ca115919a4bdf50ddc52c
Author: suvasude <[email protected]>
AuthorDate: Wed Feb 20 15:01:13 2019 -0800

    [GOBBLIN-687] Pass TopologySpec map to DagManager to allow reuse of Sp…
    
    Closes #2559 from sv2000/dagManagerTopologySpecMap
---
 .../modules/core/GobblinServiceManager.java        | 40 ++++++++++------------
 .../service/modules/orchestration/DagManager.java  | 31 +++++++++++------
 .../modules/orchestration/FSDagStateStore.java     |  9 ++---
 .../modules/orchestration/Orchestrator.java        | 30 ++++++++--------
 .../spec/JobExecutionPlanListDeserializer.java     | 18 ++++++----
 .../modules/spec/SerializationConstants.java       |  1 +
 .../modules/orchestration/DagManagerTest.java      |  4 ++-
 .../modules/orchestration/FSDagStateStoreTest.java | 39 +++++++++++++++++++--
 8 files changed, 113 insertions(+), 59 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 3e3429b..d96d6d2 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -188,24 +188,14 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
       this.isGitConfigMonitorEnabled = ConfigUtils.getBoolean(config,
           ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, 
false);
 
-      this.isDagManagerEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, false);
-
       if (this.isGitConfigMonitorEnabled) {
         this.gitConfigMonitor = new GitConfigMonitor(config, this.flowCatalog);
         this.serviceLauncher.addService(this.gitConfigMonitor);
       }
-
-      if (this.isDagManagerEnabled) {
-        this.dagManager = new DagManager(config);
-        this.serviceLauncher.addService(this.dagManager);
-      }
     } else {
       this.isGitConfigMonitorEnabled = false;
-      this.isDagManagerEnabled = false;
     }
 
-
-
     // Initialize Helix
     Optional<String> zkConnectionString = 
Optional.fromNullable(ConfigUtils.getString(config,
         ServiceConfigKeys.ZK_CONNECTION_STRING_KEY, null));
@@ -218,6 +208,13 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
       this.helixManager = Optional.absent();
     }
 
+    this.isDagManagerEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, false);
+    // Initialize DagManager
+    if (this.isDagManagerEnabled) {
+      this.dagManager = new DagManager(config);
+      this.serviceLauncher.addService(this.dagManager);
+    }
+
     // Initialize ServiceScheduler
     this.isSchedulerEnabled = ConfigUtils.getBoolean(config,
         ServiceConfigKeys.GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY, true);
@@ -339,7 +336,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
    * Handle leadership change.
    * @param changeContext notification context
    */
-  private void handleLeadershipChange(NotificationContext changeContext) {
+  private void  handleLeadershipChange(NotificationContext changeContext) {
     if (this.helixManager.isPresent() && this.helixManager.get().isLeader()) {
       LOGGER.info("Leader notification for {} HM.isLeader {}", 
this.helixManager.get().getInstanceName(),
           this.helixManager.get().isLeader());
@@ -354,7 +351,10 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
       }
 
       if (this.isDagManagerEnabled) {
-        this.dagManager.setActive(true);
+        //Activate DagManager only if TopologyCatalog is initialized. If not; 
skip activation.
+        if (this.topologyCatalog.getInitComplete().getCount() == 0) {
+          this.dagManager.setActive(true);
+        }
       }
     } else if (this.helixManager.isPresent()) {
       LOGGER.info("Leader lost notification for {} HM.isLeader {}", 
this.helixManager.get().getInstanceName(),
@@ -405,10 +405,6 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
           this.gitConfigMonitor.setActive(true);
         }
 
-        if (this.isDagManagerEnabled) {
-          this.dagManager.setActive(true);
-        }
-
       } else {
         if (this.isSchedulerEnabled) {
           LOGGER.info("[Init] Gobblin Service is running in slave instance 
mode, not enabling Scheduler.");
@@ -423,11 +419,6 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
       if (this.isGitConfigMonitorEnabled) {
         this.gitConfigMonitor.setActive(true);
       }
-
-      if (this.isDagManagerEnabled) {
-        this.dagManager.setActive(true);
-      }
-
     }
 
     // Populate TopologyCatalog with all Topologies generated by 
TopologySpecFactory
@@ -450,6 +441,13 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
 
     //Activate the SpecCompiler, after the topologyCatalog has been 
initialized.
     this.orchestrator.getSpecCompiler().setActive(true);
+
+    //Activate the DagManager service, after the topologyCatalog has been 
initialized.
+    if (!this.helixManager.isPresent() || this.helixManager.get().isLeader()){
+      if (this.isDagManagerEnabled) {
+        this.dagManager.setActive(true);
+      }
+    }
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 04a4c4e..3a7c232 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -51,6 +52,7 @@ import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.ServiceMetricNames;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -134,16 +136,19 @@ public class DagManager extends AbstractIdleService {
   private BlockingQueue<Dag<JobExecutionPlan>> queue;
   private ScheduledExecutorService scheduledExecutorPool;
   private boolean instrumentationEnabled;
+  private DagStateStore dagStateStore;
+  private Map<URI, TopologySpec> topologySpecMap;
 
   private final Integer numThreads;
   private final Integer pollingInterval;
   private final JobStatusRetriever jobStatusRetriever;
   private final KafkaJobStatusMonitor jobStatusMonitor;
-  private final DagStateStore dagStateStore;
+  private final Config config;
 
   private volatile boolean isActive = false;
 
   public DagManager(Config config, boolean instrumentationEnabled) {
+    this.config = config;
     this.queue = new LinkedBlockingDeque<>();
     this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
     this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
@@ -162,10 +167,7 @@ public class DagManager extends AbstractIdleService {
         this.jobStatusMonitor = null;
       }
       this.jobStatusRetriever =
-          (JobStatusRetriever) 
GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass,
-              ConfigUtils.getConfigOrEmpty(config, JOB_STATUS_RETRIEVER_KEY));
-      Class dagStateStoreClass = 
Class.forName(config.getString(DAG_STATESTORE_CLASS_KEY));
-      this.dagStateStore = (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config);
+          (JobStatusRetriever) 
GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, 
config);
     } catch (ReflectiveOperationException e) {
       throw new RuntimeException("Exception encountered during DagManager 
initialization", e);
     }
@@ -197,6 +199,10 @@ public class DagManager extends AbstractIdleService {
     }
   }
 
+  public synchronized void setTopologySpecMap(Map<URI, TopologySpec> 
topologySpecMap) {
+    this.topologySpecMap = topologySpecMap;
+  }
+
   /**
    * When a {@link DagManager} becomes active, it loads the serialized 
representations of the currently running {@link Dag}s
    * from the checkpoint directory, deserializes the {@link Dag}s and adds 
them to a queue to be consumed by
@@ -211,7 +217,12 @@ public class DagManager extends AbstractIdleService {
     this.isActive = active;
     try {
       if (this.isActive) {
+        log.info("Activating DagManager.");
         log.info("Scheduling {} DagManager threads", numThreads);
+        //Initializing state store for persisting Dags.
+        Class dagStateStoreClass = Class.forName(ConfigUtils.getString(config, 
DAG_STATESTORE_CLASS_KEY, FSDagStateStore.class.getName()));
+        this.dagStateStore = (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, 
topologySpecMap);
+
         //On startup, the service creates DagManagerThreads that are scheduled 
at a fixed rate.
         for (int i = 0; i < numThreads; i++) {
           this.scheduledExecutorPool.scheduleAtFixedRate(new 
DagManagerThread(jobStatusRetriever, dagStateStore, queue, 
instrumentationEnabled), 0, this.pollingInterval,
@@ -235,7 +246,7 @@ public class DagManager extends AbstractIdleService {
         log.info("Shutting down JobStatusMonitor");
         this.jobStatusMonitor.shutDown();
       }
-    } catch (IOException e) {
+    } catch (IOException | ReflectiveOperationException e) {
       log.error("Exception encountered when activating the new DagManager", e);
       throw new RuntimeException(e);
     }
@@ -300,14 +311,14 @@ public class DagManager extends AbstractIdleService {
           //Initialize dag.
           initialize(dag);
         }
-        log.info("Polling job statuses..");
+        log.debug("Polling job statuses..");
         //Poll and update the job statuses of running jobs.
         pollJobStatuses();
-        log.info("Poll done.");
+        log.debug("Poll done.");
         //Clean up any finished dags
-        log.info("Cleaning up finished dags..");
+        log.debug("Cleaning up finished dags..");
         cleanUp();
-        log.info("Clean up done");
+        log.debug("Clean up done");
       } catch (Exception e) {
         log.error("Exception encountered in {}", getClass().getName(), e);
       }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index e647fed..5473c8e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -20,7 +20,9 @@ package org.apache.gobblin.service.modules.orchestration;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Type;
+import java.net.URI;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -37,6 +39,7 @@ import com.typesafe.config.Config;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
@@ -48,8 +51,6 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
 @Slf4j
 public class FSDagStateStore implements DagStateStore {
   public static final String DAG_FILE_EXTENSION = ".dag";
-  /** Use gson for ser/de */
-  //private static final Gson GSON = 
GsonInterfaceAdapter.getGson(Object.class);
 
   /** Type token for ser/de JobExecutionPlan list */
   private static final Type LIST_JOBEXECUTIONPLAN_TYPE = new 
TypeToken<List<JobExecutionPlan>>(){}.getType();
@@ -57,7 +58,7 @@ public class FSDagStateStore implements DagStateStore {
   private final String dagCheckpointDir;
   private final Gson gson;
 
-  public FSDagStateStore(Config config) throws IOException {
+  public FSDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) throws IOException {
     this.dagCheckpointDir = config.getString(DagManager.DAG_STATESTORE_DIR);
     File checkpointDir = new File(this.dagCheckpointDir);
     if (!checkpointDir.exists()) {
@@ -67,7 +68,7 @@ public class FSDagStateStore implements DagStateStore {
     }
 
     JsonSerializer<List<JobExecutionPlan>> serializer = new 
JobExecutionPlanListSerializer();
-    JsonDeserializer<List<JobExecutionPlan>> deserializer = new 
JobExecutionPlanListDeserializer();
+    JsonDeserializer<List<JobExecutionPlan>> deserializer = new 
JobExecutionPlanListDeserializer(topologySpecMap);
     this.gson = new 
GsonBuilder().registerTypeAdapter(LIST_JOBEXECUTIONPLAN_TYPE, serializer)
         .registerTypeAdapter(LIST_JOBEXECUTIONPLAN_TYPE, 
deserializer).create();
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index fbc38e6..3b838fe 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -25,8 +25,9 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.Maps;
-import javax.annotation.Nonnull;
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
@@ -34,36 +35,32 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.typesafe.config.Config;
 
+import javax.annotation.Nonnull;
+import lombok.Getter;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.instrumented.Instrumentable;
+import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.ServiceMetricNames;
-import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
-import org.slf4j.LoggerFactory;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecProducer;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.gobblin.configuration.State;
-import org.slf4j.Logger;
-
-import lombok.Getter;
 
 
 /**
@@ -108,6 +105,11 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       throw new RuntimeException(e);
     }
 
+    //At this point, the TopologySpecMap is initialized by the SpecCompiler. 
Pass the TopologySpecMap to the DagManager.
+    if (this.dagManager.isPresent()) {
+      
this.dagManager.get().setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
+    }
+
     if (instrumentationEnabled) {
       this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
this.specCompiler.getClass());
       this.flowOrchestrationSuccessFulMeter = 
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER));
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
index 0ab13ba..11b5e3f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
@@ -22,6 +22,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import com.google.gson.JsonArray;
 import com.google.gson.JsonDeserializationContext;
@@ -36,12 +37,18 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
 @Slf4j
 public class JobExecutionPlanListDeserializer implements 
JsonDeserializer<List<JobExecutionPlan>> {
+  private final Map<URI,TopologySpec> topologySpecMap;
+
+  public JobExecutionPlanListDeserializer(Map<URI, TopologySpec> 
topologySpecMap) {
+    this.topologySpecMap = topologySpecMap;
+  }
+
   /**
    * Gson invokes this call-back method during deserialization when it 
encounters a field of the
    * specified type.
@@ -89,13 +96,12 @@ public class JobExecutionPlanListDeserializer implements 
JsonDeserializer<List<J
       }
 
       Config specExecutorConfig = 
ConfigFactory.parseString(specExecutorJson.get(SerializationConstants.SPEC_EXECUTOR_CONFIG_KEY).getAsString());
-      String className = 
specExecutorJson.get(SerializationConstants.SPEC_EXECUTOR_CLASS_KEY).getAsString();
       SpecExecutor specExecutor;
       try {
-        specExecutor =
-            (SpecExecutor) 
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(className), 
specExecutorConfig);
-      } catch (ReflectiveOperationException e) {
-        log.error("Error deserializing specExecuor {}", specExecutorConfig);
+        URI specExecutorUri = new 
URI(specExecutorConfig.getString(SerializationConstants.SPEC_EXECUTOR_URI_KEY));
+        specExecutor = 
this.topologySpecMap.get(specExecutorUri).getSpecExecutor();
+      } catch (Exception e) {
+        log.error("Error deserializing specExecutor {}", specExecutorConfig);
         throw new RuntimeException(e);
       }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
index ecffad9..aa8ea9e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
@@ -28,6 +28,7 @@ public class SerializationConstants {
   public static final String SPEC_EXECUTOR_CONFIG_KEY = "config";
   public static final String SPEC_EXECUTOR_CLASS_KEY = "class";
   public static final String SPEC_EXECUTOR_KEY = "specExecutor";
+  public static final String SPEC_EXECUTOR_URI_KEY = "uri";
 
   public static final String EXECUTION_STATUS_KEY = "executionStatus";
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 620c2ee..467ac84 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -70,7 +71,8 @@ public class DagManagerTest {
     FileUtils.deleteDirectory(new File(this.dagStateStoreDir));
     Config config = ConfigFactory.empty()
         .withValue(DagManager.DAG_STATESTORE_DIR, 
ConfigValueFactory.fromAnyRef(this.dagStateStoreDir));
-    this._dagStateStore = new FSDagStateStore(config);
+
+    this._dagStateStore = new FSDagStateStore(config, new HashMap<>());
     this._jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
     this.queue = new LinkedBlockingQueue<>();
     this._dagManagerThread = new 
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, queue, true);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
index 073d091..8611b66 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
@@ -21,9 +21,13 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -38,25 +42,52 @@ import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 public class FSDagStateStoreTest {
   private DagStateStore _dagStateStore;
   private final String dagStateStoreDir = 
"/tmp/fsDagStateStoreTest/dagStateStore";
   private File checkpointDir;
+  private Map<URI, TopologySpec> topologySpecMap;
+  private TopologySpec topologySpec;
+  private URI specExecURI;
 
   @BeforeClass
-  public void setUp() throws IOException {
+  public void setUp()
+      throws IOException, URISyntaxException {
     this.checkpointDir = new File(dagStateStoreDir);
     FileUtils.deleteDirectory(this.checkpointDir);
     Config config = 
ConfigFactory.empty().withValue(DagManager.DAG_STATESTORE_DIR, 
ConfigValueFactory.fromAnyRef(
         this.dagStateStoreDir));
-    this._dagStateStore = new FSDagStateStore(config);
+    this.topologySpecMap = new HashMap<>();
+
+    String specStoreDir = "/tmp/specStoreDir";
+    String specExecInstance = "mySpecExecutor";
+    Properties properties = new Properties();
+    properties.put("specStore.fs.dir", specStoreDir);
+    properties.put("specExecInstance.capabilities", "source:destination");
+    properties.put("specExecInstance.uri", specExecInstance);
+    properties.put("uri",specExecInstance);
+
+    Config specExecConfig = ConfigUtils.propertiesToConfig(properties);
+    SpecExecutor specExecutorInstanceProducer = new 
InMemorySpecExecutor(specExecConfig);
+    TopologySpec.Builder topologySpecBuilder = TopologySpec.builder(new 
Path(specStoreDir).toUri())
+        .withConfig(specExecConfig)
+        .withDescription("test")
+        .withVersion("1")
+        .withSpecExecutor(specExecutorInstanceProducer);
+    this.topologySpec = topologySpecBuilder.build();
+    this.specExecURI = new URI(specExecInstance);
+    this.topologySpecMap.put(this.specExecURI, topologySpec);
+
+    this._dagStateStore = new FSDagStateStore(config, this.topologySpecMap);
   }
 
   /**
@@ -77,7 +108,7 @@ public class FSDagStateStoreTest {
       }
       JobSpec js = JobSpec.builder("test_job" + 
suffix).withVersion(suffix).withConfig(jobConfig).
           withTemplate(new URI("job" + suffix)).build();
-      SpecExecutor specExecutor = new 
InMemorySpecExecutor(ConfigFactory.empty());
+      SpecExecutor specExecutor = this.topologySpec.getSpecExecutor();
       JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, 
specExecutor);
       jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
       jobExecutionPlans.add(jobExecutionPlan);
@@ -144,6 +175,8 @@ public class FSDagStateStoreTest {
       Assert.assertEquals(dag.getStartNodes().size(), 1);
       Assert.assertEquals(dag.getEndNodes().size(), 1);
       Assert.assertEquals(dag.getParentChildMap().size(), 1);
+      
Assert.assertEquals(dag.getNodes().get(0).getValue().getSpecExecutor().getUri(),
 specExecURI);
+      
Assert.assertEquals(dag.getNodes().get(1).getValue().getSpecExecutor().getUri(),
 specExecURI);
     }
   }
 

Reply via email to