APEXCORE-104 Added flattening of module into parent DAG

Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/93a6c5dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/93a6c5dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/93a6c5dc

Branch: refs/heads/feature-module
Commit: 93a6c5dc8b7bc3cf039e34ebba8ef2d11f41abdd
Parents: 6e6fd05
Author: chinmaykolhatkar <[email protected]>
Authored: Wed Oct 7 15:06:36 2015 +0530
Committer: Tushar R. Gosavi <[email protected]>
Committed: Fri Dec 18 09:56:20 2015 +0530

----------------------------------------------------------------------
 api/src/main/java/com/datatorrent/api/DAG.java  |   4 -
 .../stram/plan/logical/LogicalPlan.java         | 197 +++++++++++++++----
 .../plan/logical/LogicalPlanConfiguration.java  |   8 +
 3 files changed, 170 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/93a6c5dc/api/src/main/java/com/datatorrent/api/DAG.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DAG.java 
b/api/src/main/java/com/datatorrent/api/DAG.java
index 1dce402..7c793f9 100644
--- a/api/src/main/java/com/datatorrent/api/DAG.java
+++ b/api/src/main/java/com/datatorrent/api/DAG.java
@@ -165,10 +165,6 @@ public interface DAG extends DAGContext, Serializable
     String getName();
 
     Module getModule();
-
-    InputPortMeta getMeta(Operator.InputPort<?> port);
-
-    OutputPortMeta getMeta(Operator.OutputPort<?> port);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/93a6c5dc/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java 
b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 126d77a..3a2e76b 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -32,12 +32,14 @@ import javax.validation.constraints.NotNull;
 
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Sets;
 
 import com.datatorrent.api.*;
@@ -49,7 +51,6 @@ import com.datatorrent.api.Operator.Unifier;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-
 import com.datatorrent.common.experimental.AppData;
 import com.datatorrent.common.metric.MetricsAggregator;
 import com.datatorrent.common.metric.SingleMetricAggregator;
@@ -81,6 +82,7 @@ public class LogicalPlan implements Serializable, DAG
   public static final String SER_FILE_NAME = "dt-conf.ser";
   public static final String LAUNCH_CONFIG_FILE_NAME = "dt-launch-config.xml";
   private static final transient AtomicInteger logicalOperatorSequencer = new 
AtomicInteger();
+  public static final String MODULE_NAMESPACE_SEPARATOR = "$";
 
   /**
    * Constant
@@ -147,6 +149,7 @@ public class LogicalPlan implements Serializable, DAG
 
   private final Map<String, StreamMeta> streams = new HashMap<String, 
StreamMeta>();
   private final Map<String, OperatorMeta> operators = new HashMap<String, 
OperatorMeta>();
+  public final Map<String, ModuleMeta> modules = new LinkedHashMap<>();
   private final List<OperatorMeta> rootOperators = new 
ArrayList<OperatorMeta>();
   private final Attribute.AttributeMap attributes = new DefaultAttributeMap();
   private transient int nodeIndex = 0; // used for cycle validation
@@ -406,6 +409,7 @@ public class LogicalPlan implements Serializable, DAG
     private String persistOperatorName;
     public Map<InputPortMeta, OperatorMeta> sinkSpecificPersistOperatorMap;
     public Map<InputPortMeta, InputPortMeta> sinkSpecificPersistInputPortMap;
+    private String moduleName;  // Name of the module which has this stream. 
null if top level stream.
 
     private StreamMeta(String id)
     {
@@ -434,6 +438,16 @@ public class LogicalPlan implements Serializable, DAG
       return this;
     }
 
+    public String getModuleName()
+    {
+      return moduleName;
+    }
+
+    public void setModuleName(String moduleName)
+    {
+      this.moduleName = moduleName;
+    }
+
     public OutputPortMeta getSource()
     {
       return source;
@@ -734,6 +748,7 @@ public class LogicalPlan implements Serializable, DAG
     private transient Integer lowlink; // for cycle detection
     private transient Operator operator;
     private MetricAggregatorMeta metricAggregatorMeta;
+    private String moduleName;  // Name of the module which has this operator. 
null if this is a top level operator.
 
     /*
      * Used for  OIO validation,
@@ -820,6 +835,16 @@ public class LogicalPlan implements Serializable, DAG
       return metricAggregatorMeta;
     }
 
+    public String getModuleName()
+    {
+      return moduleName;
+    }
+
+    public void setModuleName(String moduleName)
+    {
+      this.moduleName = moduleName;
+    }
+
     protected void populateAggregatorMeta()
     {
       AutoMetric.Aggregator aggregator = 
getValue(OperatorContext.METRICS_AGGREGATOR);
@@ -1074,83 +1099,153 @@ public class LogicalPlan implements Serializable, DAG
       throw new IllegalArgumentException("duplicate operator id: " + 
operators.get(name));
     }
 
+    // Avoid name conflict with module.
+    if (modules.containsKey(name))
+      throw new IllegalArgumentException("duplicate operator id: " + 
operators.get(name));
+
     OperatorMeta decl = new OperatorMeta(name, operator);
     rootOperators.add(decl); // will be removed when a sink is added to an 
input port for this operator
     operators.put(name, decl);
     return operator;
   }
 
+  /**
+   * Module meta object.
+   */
   public final class ModuleMeta implements DAG.ModuleMeta, Serializable
   {
     private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new 
LinkedHashMap<InputPortMeta, StreamMeta>();
     private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = 
new LinkedHashMap<OutputPortMeta, StreamMeta>();
     private final Attribute.AttributeMap attributes;
-    @SuppressWarnings("unused")
-    private final int id;
     @NotNull
-    private final String name;
-    private transient Integer nindex; // for cycle detection
-    private transient Integer lowlink; // for cycle detection
+    private String name;
     private transient Module module;
+    private ModuleMeta parent;
+    private LogicalPlan dag = null;
+    private transient String fullName;
 
-    public ModuleMeta(String name, Module module)
-    {
-      this(name, module, new DefaultAttributeMap());
-    }
-
-    public ModuleMeta(String name, Module module, DefaultAttributeMap 
attributeMap)
+    private ModuleMeta(String name, Module module)
     {
       LOG.debug("Initializing {} as {}", name, module.getClass().getName());
       this.name = name;
       this.module = module;
-      this.id = logicalOperatorSequencer.decrementAndGet();
-      this.attributes = attributeMap;
+      this.attributes = new DefaultAttributeMap();
+      this.dag = new LogicalPlan();
     }
 
-    @Override public String getName()
+    @Override
+    public String getName()
     {
       return name;
     }
 
-    @Override public Module getModule()
+    @Override
+    public Module getModule()
     {
       return module;
     }
 
-    @Override public DAG.InputPortMeta getMeta(InputPort<?> port)
+    @Override
+    public Attribute.AttributeMap getAttributes()
     {
-      return null;
+      return attributes;
     }
 
-    @Override public DAG.OutputPortMeta getMeta(OutputPort<?> port)
+    @Override
+    public <T> T getValue(Attribute<T> key)
     {
-      return null;
+      return attributes.get(key);
     }
 
-    @Override public Attribute.AttributeMap getAttributes()
+    @Override
+    public void setCounters(Object counters)
     {
-      return null;
+
+    }
+
+    @Override
+    public void sendMetrics(Collection<String> metricNames)
+    {
+
     }
 
-    @Override public <T> T getValue(Attribute<T> key)
+    public LinkedHashMap<InputPortMeta, StreamMeta> getInputStreams()
     {
-      return null;
+      return inputStreams;
     }
 
-    @Override public void setCounters(Object counters)
+    public LinkedHashMap<OutputPortMeta, StreamMeta> getOutputStreams()
     {
+      return outputStreams;
+    }
 
+    public LogicalPlan getDag()
+    {
+      return dag;
     }
 
-    @Override public void sendMetrics(Collection<String> metricNames)
+    private void writeObject(ObjectOutputStream out) throws IOException
     {
+      out.defaultWriteObject();
+      FSStorageAgent.store(out, module);
+    }
 
+    private void readObject(ObjectInputStream input) throws IOException, 
ClassNotFoundException
+    {
+      input.defaultReadObject();
+      module = (Module)FSStorageAgent.retrieve(input);
     }
-  }
 
-  public transient Map<String, ModuleMeta> modules = Maps.newHashMap();
+    /**
+     * Expand the module and add its operator to the parentDAG. After this 
method finishes the
+     * module is expanded fully with all its submodules also expanded. The 
parentDAG contains
+     * the operator added by all the modules.
+     *
+     * @param parentDAG parent dag to populate with operators from this and 
inner modules.
+     * @param conf configuration object.
+     */
+    public void flattenModule(LogicalPlan parentDAG, Configuration conf)
+    {
+      module.populateDAG(dag, conf);
+      for (ModuleMeta subModuleMeta : dag.getAllModules()) {
+        subModuleMeta.setParent(this);
+        subModuleMeta.flattenModule(dag, conf);
+      }
+      parentDAG.addDAGToCurrentDAG(this);
+    }
+
+    /**
+     * Return full name of the module. If this is a inner module, i.e module 
inside of module this method will
+     * traverse till the top level module, and construct the name by 
concatenating name of modules in the chain
+     * in reverse order separated by MODULE_NAMESPACE_SEPARATO.
+     *
+     * For example If there is module M1, which adds another module M2 in the 
DAG. Then the full name of the
+     * module M2 is ("M1" ++ MODULE_NAMESPACE_SEPARATO + "M2")
+     *
+     * @return full name of the module.
+     */
+    public String getFullName()
+    {
+      if (fullName != null)
+        return fullName;
 
-  @Override public <T extends Module> T addModule(String name, T module)
+      if (parent == null) {
+        fullName = name;
+      } else {
+        fullName = parent.getFullName() + MODULE_NAMESPACE_SEPARATOR + name;
+      }
+      return fullName;
+    }
+
+    private void setParent(ModuleMeta meta) {
+      this.parent = meta;
+    }
+
+    private static final long serialVersionUID = 7562277769188329223L;
+  }
+
+  @Override
+  public <T extends Module> T addModule(String name, T module)
   {
     if (modules.containsKey(name)) {
       if (modules.get(name).module == module) {
@@ -1158,12 +1253,16 @@ public class LogicalPlan implements Serializable, DAG
       }
       throw new IllegalArgumentException("duplicate module is: " + 
modules.get(name));
     }
+    if (operators.containsKey(name))
+      throw new IllegalArgumentException("duplicate module is: " + 
modules.get(name));
+
     ModuleMeta meta = new ModuleMeta(name, module);
     modules.put(name, meta);
     return module;
   }
 
-  @Override public <T extends Module> T addModule(String name, Class<T> clazz)
+  @Override
+  public <T extends Module> T addModule(String name, Class<T> clazz)
   {
     T instance;
     try {
@@ -1219,6 +1318,33 @@ public class LogicalPlan implements Serializable, DAG
     return s;
   }
 
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private void addDAGToCurrentDAG(ModuleMeta moduleMeta)
+  {
+    LogicalPlan subDag = moduleMeta.getDag();
+    String subDAGName = moduleMeta.getName();
+    String name;
+    for (OperatorMeta operatorMeta : subDag.getAllOperators()) {
+      name = subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName();
+      this.addOperator(name, operatorMeta.getOperator());
+      OperatorMeta operatorMetaNew = this.getOperatorMeta(name);
+      operatorMetaNew.setModuleName(operatorMeta.getModuleName() == null ? 
subDAGName : subDAGName + MODULE_NAMESPACE_SEPARATOR + 
operatorMeta.getModuleName());
+    }
+
+    for (StreamMeta streamMeta : subDag.getAllStreams()) {
+      OutputPortMeta sourceMeta = streamMeta.getSource();
+      List<InputPort<?>> ports = new LinkedList<>();
+      for (InputPortMeta inputPortMeta : streamMeta.getSinks()) {
+        ports.add(inputPortMeta.getPortObject());
+      }
+      InputPort[] inputPorts = ports.toArray(new InputPort[]{});
+
+      name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName();
+      StreamMeta streamMetaNew = this.addStream(name, 
sourceMeta.getPortObject(), inputPorts);
+      streamMetaNew.setModuleName(streamMeta.getModuleName() == null ? 
subDAGName : subDAGName + "_" + streamMeta.getModuleName());
+    }
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> 
source, Operator.InputPort<? super T> sink1)
@@ -1267,7 +1393,7 @@ public class LogicalPlan implements Serializable, DAG
 
   private OutputPortMeta assertGetPortMeta(Operator.OutputPort<?> port)
   {
-    for (OperatorMeta o: getAllOperators()) {
+    for (OperatorMeta o : getAllOperators()) {
       OutputPortMeta opm = o.getPortMapping().outPortMap.get(port);
       if (opm != null) {
         return opm;
@@ -1278,7 +1404,7 @@ public class LogicalPlan implements Serializable, DAG
 
   private InputPortMeta assertGetPortMeta(Operator.InputPort<?> port)
   {
-    for (OperatorMeta o: getAllOperators()) {
+    for (OperatorMeta o : getAllOperators()) {
       InputPortMeta opm = o.getPortMapping().inPortMap.get(port);
       if (opm != null) {
         return opm;
@@ -1315,7 +1441,8 @@ public class LogicalPlan implements Serializable, DAG
     return Collections.unmodifiableCollection(this.operators.values());
   }
 
-  public Collection<ModuleMeta> getAllModules() {
+  public Collection<ModuleMeta> getAllModules()
+  {
     return Collections.unmodifiableCollection(this.modules.values());
   }
 
@@ -1332,7 +1459,7 @@ public class LogicalPlan implements Serializable, DAG
 
   public ModuleMeta getModuleMeta(String moduleName)
   {
-    return null;
+    return this.modules.get(moduleName);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/93a6c5dc/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
 
b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 9bbe85c..6dc4c0c 100644
--- 
a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ 
b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -2116,12 +2116,20 @@ public class LogicalPlanConfiguration {
 
     // Expand the modules within the dag recursively
     setModuleProperties(dag, appName);
+    flattenDAG(dag, conf);
 
     // inject external operator configuration
     setOperatorConfiguration(dag, appConfs, appName);
     setStreamConfiguration(dag, appConfs, appName);
   }
 
+  private void flattenDAG(LogicalPlan dag, Configuration conf)
+  {
+    for (ModuleMeta moduleMeta : dag.getAllModules()) {
+      moduleMeta.flattenModule(dag, conf);
+    }
+  }
+
   public static Properties readProperties(String filePath) throws IOException
   {
     InputStream is = new FileInputStream(filePath);

Reply via email to