APEXCORE-104 Added flattening of module into parent DAG
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/93a6c5dc Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/93a6c5dc Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/93a6c5dc Branch: refs/heads/feature-module Commit: 93a6c5dc8b7bc3cf039e34ebba8ef2d11f41abdd Parents: 6e6fd05 Author: chinmaykolhatkar <[email protected]> Authored: Wed Oct 7 15:06:36 2015 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Fri Dec 18 09:56:20 2015 +0530 ---------------------------------------------------------------------- api/src/main/java/com/datatorrent/api/DAG.java | 4 - .../stram/plan/logical/LogicalPlan.java | 197 +++++++++++++++---- .../plan/logical/LogicalPlanConfiguration.java | 8 + 3 files changed, 170 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/93a6c5dc/api/src/main/java/com/datatorrent/api/DAG.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java index 1dce402..7c793f9 100644 --- a/api/src/main/java/com/datatorrent/api/DAG.java +++ b/api/src/main/java/com/datatorrent/api/DAG.java @@ -165,10 +165,6 @@ public interface DAG extends DAGContext, Serializable String getName(); Module getModule(); - - InputPortMeta getMeta(Operator.InputPort<?> port); - - OutputPortMeta getMeta(Operator.OutputPort<?> port); } /** http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/93a6c5dc/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 126d77a..3a2e76b 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -32,12 +32,14 @@ import javax.validation.constraints.NotNull; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Sets; import com.datatorrent.api.*; @@ -49,7 +51,6 @@ import com.datatorrent.api.Operator.Unifier; import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; - import com.datatorrent.common.experimental.AppData; import com.datatorrent.common.metric.MetricsAggregator; import com.datatorrent.common.metric.SingleMetricAggregator; @@ -81,6 +82,7 @@ public class LogicalPlan implements Serializable, DAG public static final String SER_FILE_NAME = "dt-conf.ser"; public static final String LAUNCH_CONFIG_FILE_NAME = "dt-launch-config.xml"; private static final transient AtomicInteger logicalOperatorSequencer = new AtomicInteger(); + public static final String MODULE_NAMESPACE_SEPARATOR = "$"; /** * Constant @@ -147,6 +149,7 @@ public class LogicalPlan implements Serializable, DAG private final Map<String, StreamMeta> streams = new HashMap<String, StreamMeta>(); private final Map<String, OperatorMeta> operators = new HashMap<String, OperatorMeta>(); + public final Map<String, ModuleMeta> modules = new LinkedHashMap<>(); private final List<OperatorMeta> rootOperators = new ArrayList<OperatorMeta>(); private final Attribute.AttributeMap attributes = new DefaultAttributeMap(); private transient int nodeIndex = 0; // used for cycle validation @@ -406,6 +409,7 @@ public class LogicalPlan implements Serializable, DAG private String persistOperatorName; public Map<InputPortMeta, OperatorMeta> sinkSpecificPersistOperatorMap; public Map<InputPortMeta, InputPortMeta> sinkSpecificPersistInputPortMap; + private String moduleName; // Name of the module which has this stream. null if top level stream. private StreamMeta(String id) { @@ -434,6 +438,16 @@ public class LogicalPlan implements Serializable, DAG return this; } + public String getModuleName() + { + return moduleName; + } + + public void setModuleName(String moduleName) + { + this.moduleName = moduleName; + } + public OutputPortMeta getSource() { return source; @@ -734,6 +748,7 @@ public class LogicalPlan implements Serializable, DAG private transient Integer lowlink; // for cycle detection private transient Operator operator; private MetricAggregatorMeta metricAggregatorMeta; + private String moduleName; // Name of the module which has this operator. null if this is a top level operator. /* * Used for OIO validation, @@ -820,6 +835,16 @@ public class LogicalPlan implements Serializable, DAG return metricAggregatorMeta; } + public String getModuleName() + { + return moduleName; + } + + public void setModuleName(String moduleName) + { + this.moduleName = moduleName; + } + protected void populateAggregatorMeta() { AutoMetric.Aggregator aggregator = getValue(OperatorContext.METRICS_AGGREGATOR); @@ -1074,83 +1099,153 @@ public class LogicalPlan implements Serializable, DAG throw new IllegalArgumentException("duplicate operator id: " + operators.get(name)); } + // Avoid name conflict with module. + if (modules.containsKey(name)) + throw new IllegalArgumentException("duplicate operator id: " + operators.get(name)); + OperatorMeta decl = new OperatorMeta(name, operator); rootOperators.add(decl); // will be removed when a sink is added to an input port for this operator operators.put(name, decl); return operator; } + /** + * Module meta object. + */ public final class ModuleMeta implements DAG.ModuleMeta, Serializable { private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new LinkedHashMap<InputPortMeta, StreamMeta>(); private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = new LinkedHashMap<OutputPortMeta, StreamMeta>(); private final Attribute.AttributeMap attributes; - @SuppressWarnings("unused") - private final int id; @NotNull - private final String name; - private transient Integer nindex; // for cycle detection - private transient Integer lowlink; // for cycle detection + private String name; private transient Module module; + private ModuleMeta parent; + private LogicalPlan dag = null; + private transient String fullName; - public ModuleMeta(String name, Module module) - { - this(name, module, new DefaultAttributeMap()); - } - - public ModuleMeta(String name, Module module, DefaultAttributeMap attributeMap) + private ModuleMeta(String name, Module module) { LOG.debug("Initializing {} as {}", name, module.getClass().getName()); this.name = name; this.module = module; - this.id = logicalOperatorSequencer.decrementAndGet(); - this.attributes = attributeMap; + this.attributes = new DefaultAttributeMap(); + this.dag = new LogicalPlan(); } - @Override public String getName() + @Override + public String getName() { return name; } - @Override public Module getModule() + @Override + public Module getModule() { return module; } - @Override public DAG.InputPortMeta getMeta(InputPort<?> port) + @Override + public Attribute.AttributeMap getAttributes() { - return null; + return attributes; } - @Override public DAG.OutputPortMeta getMeta(OutputPort<?> port) + @Override + public <T> T getValue(Attribute<T> key) { - return null; + return attributes.get(key); } - @Override public Attribute.AttributeMap getAttributes() + @Override + public void setCounters(Object counters) { - return null; + + } + + @Override + public void sendMetrics(Collection<String> metricNames) + { + } - @Override public <T> T getValue(Attribute<T> key) + public LinkedHashMap<InputPortMeta, StreamMeta> getInputStreams() { - return null; + return inputStreams; } - @Override public void setCounters(Object counters) + public LinkedHashMap<OutputPortMeta, StreamMeta> getOutputStreams() { + return outputStreams; + } + public LogicalPlan getDag() + { + return dag; } - @Override public void sendMetrics(Collection<String> metricNames) + private void writeObject(ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + FSStorageAgent.store(out, module); + } + private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException + { + input.defaultReadObject(); + module = (Module)FSStorageAgent.retrieve(input); } - } - public transient Map<String, ModuleMeta> modules = Maps.newHashMap(); + /** + * Expand the module and add its operator to the parentDAG. After this method finishes the + * module is expanded fully with all its submodules also expanded. The parentDAG contains + * the operator added by all the modules. + * + * @param parentDAG parent dag to populate with operators from this and inner modules. + * @param conf configuration object. + */ + public void flattenModule(LogicalPlan parentDAG, Configuration conf) + { + module.populateDAG(dag, conf); + for (ModuleMeta subModuleMeta : dag.getAllModules()) { + subModuleMeta.setParent(this); + subModuleMeta.flattenModule(dag, conf); + } + parentDAG.addDAGToCurrentDAG(this); + } + + /** + * Return full name of the module. If this is a inner module, i.e module inside of module this method will + * traverse till the top level module, and construct the name by concatenating name of modules in the chain + * in reverse order separated by MODULE_NAMESPACE_SEPARATO. + * + * For example If there is module M1, which adds another module M2 in the DAG. Then the full name of the + * module M2 is ("M1" ++ MODULE_NAMESPACE_SEPARATO + "M2") + * + * @return full name of the module. + */ + public String getFullName() + { + if (fullName != null) + return fullName; - @Override public <T extends Module> T addModule(String name, T module) + if (parent == null) { + fullName = name; + } else { + fullName = parent.getFullName() + MODULE_NAMESPACE_SEPARATOR + name; + } + return fullName; + } + + private void setParent(ModuleMeta meta) { + this.parent = meta; + } + + private static final long serialVersionUID = 7562277769188329223L; + } + + @Override + public <T extends Module> T addModule(String name, T module) { if (modules.containsKey(name)) { if (modules.get(name).module == module) { @@ -1158,12 +1253,16 @@ public class LogicalPlan implements Serializable, DAG } throw new IllegalArgumentException("duplicate module is: " + modules.get(name)); } + if (operators.containsKey(name)) + throw new IllegalArgumentException("duplicate module is: " + modules.get(name)); + ModuleMeta meta = new ModuleMeta(name, module); modules.put(name, meta); return module; } - @Override public <T extends Module> T addModule(String name, Class<T> clazz) + @Override + public <T extends Module> T addModule(String name, Class<T> clazz) { T instance; try { @@ -1219,6 +1318,33 @@ public class LogicalPlan implements Serializable, DAG return s; } + @SuppressWarnings({ "unchecked", "rawtypes" }) + private void addDAGToCurrentDAG(ModuleMeta moduleMeta) + { + LogicalPlan subDag = moduleMeta.getDag(); + String subDAGName = moduleMeta.getName(); + String name; + for (OperatorMeta operatorMeta : subDag.getAllOperators()) { + name = subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName(); + this.addOperator(name, operatorMeta.getOperator()); + OperatorMeta operatorMetaNew = this.getOperatorMeta(name); + operatorMetaNew.setModuleName(operatorMeta.getModuleName() == null ? subDAGName : subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getModuleName()); + } + + for (StreamMeta streamMeta : subDag.getAllStreams()) { + OutputPortMeta sourceMeta = streamMeta.getSource(); + List<InputPort<?>> ports = new LinkedList<>(); + for (InputPortMeta inputPortMeta : streamMeta.getSinks()) { + ports.add(inputPortMeta.getPortObject()); + } + InputPort[] inputPorts = ports.toArray(new InputPort[]{}); + + name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName(); + StreamMeta streamMetaNew = this.addStream(name, sourceMeta.getPortObject(), inputPorts); + streamMetaNew.setModuleName(streamMeta.getModuleName() == null ? subDAGName : subDAGName + "_" + streamMeta.getModuleName()); + } + } + @Override @SuppressWarnings("unchecked") public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1) @@ -1267,7 +1393,7 @@ public class LogicalPlan implements Serializable, DAG private OutputPortMeta assertGetPortMeta(Operator.OutputPort<?> port) { - for (OperatorMeta o: getAllOperators()) { + for (OperatorMeta o : getAllOperators()) { OutputPortMeta opm = o.getPortMapping().outPortMap.get(port); if (opm != null) { return opm; @@ -1278,7 +1404,7 @@ public class LogicalPlan implements Serializable, DAG private InputPortMeta assertGetPortMeta(Operator.InputPort<?> port) { - for (OperatorMeta o: getAllOperators()) { + for (OperatorMeta o : getAllOperators()) { InputPortMeta opm = o.getPortMapping().inPortMap.get(port); if (opm != null) { return opm; @@ -1315,7 +1441,8 @@ public class LogicalPlan implements Serializable, DAG return Collections.unmodifiableCollection(this.operators.values()); } - public Collection<ModuleMeta> getAllModules() { + public Collection<ModuleMeta> getAllModules() + { return Collections.unmodifiableCollection(this.modules.values()); } @@ -1332,7 +1459,7 @@ public class LogicalPlan implements Serializable, DAG public ModuleMeta getModuleMeta(String moduleName) { - return null; + return this.modules.get(moduleName); } @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/93a6c5dc/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java index 9bbe85c..6dc4c0c 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java @@ -2116,12 +2116,20 @@ public class LogicalPlanConfiguration { // Expand the modules within the dag recursively setModuleProperties(dag, appName); + flattenDAG(dag, conf); // inject external operator configuration setOperatorConfiguration(dag, appConfs, appName); setStreamConfiguration(dag, appConfs, appName); } + private void flattenDAG(LogicalPlan dag, Configuration conf) + { + for (ModuleMeta moduleMeta : dag.getAllModules()) { + moduleMeta.flattenModule(dag, conf); + } + } + public static Properties readProperties(String filePath) throws IOException { InputStream is = new FileInputStream(filePath);
