APEXCORE-104 Added flattening of module into parent DAG
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/14a09bb5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/14a09bb5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/14a09bb5 Branch: refs/heads/master Commit: 14a09bb51cb30585e7979ec022ec762ac2ba91e5 Parents: b0360d4 Author: chinmaykolhatkar <[email protected]> Authored: Wed Oct 7 15:06:36 2015 +0530 Committer: chinmaykolhatkar <[email protected]> Committed: Tue Dec 22 01:42:43 2015 +0530 ---------------------------------------------------------------------- api/src/main/java/com/datatorrent/api/DAG.java | 4 - .../stram/plan/logical/LogicalPlan.java | 169 +++++++++++++++---- .../plan/logical/LogicalPlanConfiguration.java | 8 + 3 files changed, 146 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/14a09bb5/api/src/main/java/com/datatorrent/api/DAG.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java index 1dce402..7c793f9 100644 --- a/api/src/main/java/com/datatorrent/api/DAG.java +++ b/api/src/main/java/com/datatorrent/api/DAG.java @@ -165,10 +165,6 @@ public interface DAG extends DAGContext, Serializable String getName(); Module getModule(); - - InputPortMeta getMeta(Operator.InputPort<?> port); - - OutputPortMeta getMeta(Operator.OutputPort<?> port); } /** http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/14a09bb5/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 53e81bc..5a3e167 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -80,6 +80,7 @@ public class LogicalPlan implements Serializable, DAG public static final String SER_FILE_NAME = "dt-conf.ser"; public static final String LAUNCH_CONFIG_FILE_NAME = "dt-launch-config.xml"; private static final transient AtomicInteger logicalOperatorSequencer = new AtomicInteger(); + public static final String MODULE_NAMESPACE_SEPARATOR = "$"; /** * Constant @@ -146,6 +147,7 @@ public class LogicalPlan implements Serializable, DAG private final Map<String, StreamMeta> streams = new HashMap<String, StreamMeta>(); private final Map<String, OperatorMeta> operators = new HashMap<String, OperatorMeta>(); + public final Map<String, ModuleMeta> modules = new LinkedHashMap<>(); private final List<OperatorMeta> rootOperators = new ArrayList<OperatorMeta>(); private final Attribute.AttributeMap attributes = new DefaultAttributeMap(); private transient int nodeIndex = 0; // used for cycle validation @@ -733,6 +735,7 @@ public class LogicalPlan implements Serializable, DAG private transient Integer lowlink; // for cycle detection private transient Operator operator; private MetricAggregatorMeta metricAggregatorMeta; + private String moduleName; // Name of the module which has this operator. null if this is a top level operator. /* * Used for OIO validation, @@ -819,6 +822,16 @@ public class LogicalPlan implements Serializable, DAG return metricAggregatorMeta; } + public String getModuleName() + { + return moduleName; + } + + public void setModuleName(String moduleName) + { + this.moduleName = moduleName; + } + protected void populateAggregatorMeta() { AutoMetric.Aggregator aggregator = getValue(OperatorContext.METRICS_AGGREGATOR); @@ -1073,37 +1086,38 @@ public class LogicalPlan implements Serializable, DAG throw new IllegalArgumentException("duplicate operator id: " + operators.get(name)); } + // Avoid name conflict with module. + if (modules.containsKey(name)) + throw new IllegalArgumentException("duplicate operator id: " + operators.get(name)); + OperatorMeta decl = new OperatorMeta(name, operator); rootOperators.add(decl); // will be removed when a sink is added to an input port for this operator operators.put(name, decl); return operator; } + /** + * Module meta object. + */ public final class ModuleMeta implements DAG.ModuleMeta, Serializable { private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new LinkedHashMap<>(); private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = new LinkedHashMap<>(); private final Attribute.AttributeMap attributes; - @SuppressWarnings("unused") - private final int id; @NotNull - private final String name; - private transient Integer nindex; // for cycle detection - private transient Integer lowlink; // for cycle detection + private String name; private transient Module module; + private ModuleMeta parent; + private LogicalPlan dag = null; + private transient String fullName; - public ModuleMeta(String name, Module module) - { - this(name, module, new DefaultAttributeMap()); - } - - public ModuleMeta(String name, Module module, DefaultAttributeMap attributeMap) + private ModuleMeta(String name, Module module) { LOG.debug("Initializing {} as {}", name, module.getClass().getName()); this.name = name; this.module = module; - this.id = logicalOperatorSequencer.decrementAndGet(); - this.attributes = attributeMap; + this.attributes = new DefaultAttributeMap(); + this.dag = new LogicalPlan(); } @Override @@ -1119,43 +1133,104 @@ public class LogicalPlan implements Serializable, DAG } @Override - public DAG.InputPortMeta getMeta(InputPort<?> port) + public Attribute.AttributeMap getAttributes() { - return null; + return attributes; } @Override - public DAG.OutputPortMeta getMeta(OutputPort<?> port) + public <T> T getValue(Attribute<T> key) { - return null; + return attributes.get(key); } @Override - public Attribute.AttributeMap getAttributes() + public void setCounters(Object counters) { - return null; + } @Override - public <T> T getValue(Attribute<T> key) + public void sendMetrics(Collection<String> metricNames) { - return null; + } - @Override - public void setCounters(Object counters) + public LinkedHashMap<InputPortMeta, StreamMeta> getInputStreams() { + return inputStreams; + } + public LinkedHashMap<OutputPortMeta, StreamMeta> getOutputStreams() + { + return outputStreams; } - @Override - public void sendMetrics(Collection<String> metricNames) + public LogicalPlan getDag() { + return dag; + } + private void writeObject(ObjectOutputStream out) throws IOException + { + out.defaultWriteObject(); + FSStorageAgent.store(out, module); } - } - public transient Map<String, ModuleMeta> modules = Maps.newHashMap(); + private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException + { + input.defaultReadObject(); + module = (Module)FSStorageAgent.retrieve(input); + } + + /** + * Expand the module and add its operator to the parentDAG. After this method finishes the module is expanded fully + * with all its submodules also expanded. The parentDAG contains the operator added by all the modules. + * + * @param parentDAG parent dag to populate with operators from this and inner modules. + * @param conf configuration object. + */ + public void flattenModule(LogicalPlan parentDAG, org.apache.hadoop.conf.Configuration conf) + { + module.populateDAG(dag, conf); + for (ModuleMeta subModuleMeta : dag.getAllModules()) { + subModuleMeta.setParent(this); + subModuleMeta.flattenModule(dag, conf); + } + parentDAG.addDAGToCurrentDAG(this); + } + + /** + * Return full name of the module. If this is a inner module, i.e module inside of module this method will traverse + * till the top level module, and construct the name by concatenating name of modules in the chain in reverse order + * separated by MODULE_NAMESPACE_SEPARATO. + * + * For example If there is module M1, which adds another module M2 in the DAG. Then the full name of the module M2 + * is ("M1" ++ MODULE_NAMESPACE_SEPARATO + "M2") + * + * @return full name of the module. + */ + public String getFullName() + { + if (fullName != null) { + return fullName; + } + + if (parent == null) { + fullName = name; + } else { + fullName = parent.getFullName() + MODULE_NAMESPACE_SEPARATOR + name; + } + return fullName; + } + + private void setParent(ModuleMeta meta) + { + this.parent = meta; + } + + private static final long serialVersionUID = 7562277769188329223L; + } @Override public <T extends Module> T addModule(String name, T module) @@ -1166,6 +1241,10 @@ public class LogicalPlan implements Serializable, DAG } throw new IllegalArgumentException("duplicate module is: " + modules.get(name)); } + if (operators.containsKey(name)) { + throw new IllegalArgumentException("duplicate module is: " + modules.get(name)); + } + ModuleMeta meta = new ModuleMeta(name, module); modules.put(name, meta); return module; @@ -1228,6 +1307,33 @@ public class LogicalPlan implements Serializable, DAG return s; } + @SuppressWarnings({ "unchecked", "rawtypes" }) + private void addDAGToCurrentDAG(ModuleMeta moduleMeta) + { + LogicalPlan subDag = moduleMeta.getDag(); + String subDAGName = moduleMeta.getName(); + String name; + for (OperatorMeta operatorMeta : subDag.getAllOperators()) { + name = subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName(); + this.addOperator(name, operatorMeta.getOperator()); + OperatorMeta operatorMetaNew = this.getOperatorMeta(name); + operatorMetaNew.setModuleName(operatorMeta.getModuleName() == null ? subDAGName : + subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getModuleName()); + } + + for (StreamMeta streamMeta : subDag.getAllStreams()) { + OutputPortMeta sourceMeta = streamMeta.getSource(); + List<InputPort<?>> ports = new LinkedList<>(); + for (InputPortMeta inputPortMeta : streamMeta.getSinks()) { + ports.add(inputPortMeta.getPortObject()); + } + InputPort[] inputPorts = ports.toArray(new InputPort[]{}); + + name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName(); + this.addStream(name, sourceMeta.getPortObject(), inputPorts); + } + } + @Override @SuppressWarnings("unchecked") public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1) @@ -1276,7 +1382,7 @@ public class LogicalPlan implements Serializable, DAG private OutputPortMeta assertGetPortMeta(Operator.OutputPort<?> port) { - for (OperatorMeta o: getAllOperators()) { + for (OperatorMeta o : getAllOperators()) { OutputPortMeta opm = o.getPortMapping().outPortMap.get(port); if (opm != null) { return opm; @@ -1287,7 +1393,7 @@ public class LogicalPlan implements Serializable, DAG private InputPortMeta assertGetPortMeta(Operator.InputPort<?> port) { - for (OperatorMeta o: getAllOperators()) { + for (OperatorMeta o : getAllOperators()) { InputPortMeta opm = o.getPortMapping().inPortMap.get(port); if (opm != null) { return opm; @@ -1324,7 +1430,8 @@ public class LogicalPlan implements Serializable, DAG return Collections.unmodifiableCollection(this.operators.values()); } - public Collection<ModuleMeta> getAllModules() { + public Collection<ModuleMeta> getAllModules() + { return Collections.unmodifiableCollection(this.modules.values()); } @@ -1341,7 +1448,7 @@ public class LogicalPlan implements Serializable, DAG public ModuleMeta getModuleMeta(String moduleName) { - return null; + return this.modules.get(moduleName); } @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/14a09bb5/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java index 9bbe85c..6dc4c0c 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java @@ -2116,12 +2116,20 @@ public class LogicalPlanConfiguration { // Expand the modules within the dag recursively setModuleProperties(dag, appName); + flattenDAG(dag, conf); // inject external operator configuration setOperatorConfiguration(dag, appConfs, appName); setStreamConfiguration(dag, appConfs, appName); } + private void flattenDAG(LogicalPlan dag, Configuration conf) + { + for (ModuleMeta moduleMeta : dag.getAllModules()) { + moduleMeta.flattenModule(dag, conf); + } + } + public static Properties readProperties(String filePath) throws IOException { InputStream is = new FileInputStream(filePath);
