APEXCORE-194 Added support for proxy ports Added test cases.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/c1314eaf Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/c1314eaf Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/c1314eaf Branch: refs/heads/devel-3 Commit: c1314eafaac239b420d085a4584d5c5acaf3e69b Parents: 14a09bb Author: bhupeshchawda <[email protected]> Authored: Tue Oct 6 12:34:24 2015 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Tue Dec 22 02:04:18 2015 +0530 ---------------------------------------------------------------------- .../main/java/com/datatorrent/api/Module.java | 120 ++++ .../stram/plan/logical/LogicalPlan.java | 51 +- .../plan/logical/LogicalPlanConfiguration.java | 1 + .../plan/logical/module/ModuleAppTest.java | 168 ++++++ .../logical/module/TestModuleExpansion.java | 552 +++++++++++++++++++ 5 files changed, 888 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c1314eaf/api/src/main/java/com/datatorrent/api/Module.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Module.java b/api/src/main/java/com/datatorrent/api/Module.java index 1220fc1..67682e7 100644 --- a/api/src/main/java/com/datatorrent/api/Module.java +++ b/api/src/main/java/com/datatorrent/api/Module.java @@ -21,8 +21,128 @@ package com.datatorrent.api; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.Operator.InputPort; +import com.datatorrent.api.Operator.OutputPort; +import com.datatorrent.api.Operator.Unifier; + +/** + * A Module is a component which can be added to the DAG similar to the operator, + * using addModule API. The module should implement populateDAG method, which + * will be called by the platform, and DAG populated by the module will be + * replace in place of the module. + * + */ @InterfaceStability.Evolving public interface Module { void populateDAG(DAG dag, Configuration conf); + + /** + * These ports allow platform to short circuit module port to the operator port. i.e When a module is expanded, it can + * specify which operator's port is used to replaced the module port in the final DAG. + * + * @param <T> data type accepted at the input port. + */ + final class ProxyInputPort<T> implements InputPort<T> + { + InputPort<T> inputPort; + + public void set(InputPort<T> port) + { + inputPort = port; + } + + public InputPort<T> get() + { + return inputPort; + } + + @Override + public void setup(PortContext context) + { + if (inputPort != null) { + inputPort.setup(context); + } + } + + @Override + public void teardown() + { + if (inputPort != null) { + inputPort.teardown(); + } + } + + @Override + public Sink<T> getSink() + { + return inputPort == null ? null : inputPort.getSink(); + } + + @Override + public void setConnected(boolean connected) + { + if (inputPort != null) { + inputPort.setConnected(connected); + } + } + + @Override + public StreamCodec<T> getStreamCodec() + { + return inputPort == null ? null : inputPort.getStreamCodec(); + } + } + + /** + * Similar to ProxyInputPort, but on output side. + * + * @param <T> datatype emitted on the port. + */ + final class ProxyOutputPort<T> implements OutputPort<T> + { + OutputPort<T> outputPort; + + public void set(OutputPort<T> port) + { + outputPort = port; + } + + public OutputPort<T> get() + { + return outputPort; + } + + @Override + public void setup(PortContext context) + { + if (outputPort != null) { + outputPort.setup(context); + } + } + + @Override + public void teardown() + { + if (outputPort != null) { + outputPort.teardown(); + } + } + + @Override + public void setSink(Sink<Object> s) + { + if (outputPort != null) { + outputPort.setSink(s); + } + } + + @Override + public Unifier<T> getUnifier() + { + return outputPort == null ? null : outputPort.getUnifier(); + } + } } + http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c1314eaf/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 5a3e167..21039cc 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -37,12 +37,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Sets; import com.datatorrent.api.*; import com.datatorrent.api.Attribute.AttributeMap; import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap; +import com.datatorrent.api.Module.ProxyInputPort; +import com.datatorrent.api.Module.ProxyOutputPort; import com.datatorrent.api.Operator.InputPort; import com.datatorrent.api.Operator.OutputPort; import com.datatorrent.api.Operator.Unifier; @@ -152,6 +154,7 @@ public class LogicalPlan implements Serializable, DAG private final Attribute.AttributeMap attributes = new DefaultAttributeMap(); private transient int nodeIndex = 0; // used for cycle validation private transient Stack<OperatorMeta> stack = new Stack<OperatorMeta>(); // used for cycle validation + private transient Map<String, ArrayListMultimap<OutputPort<?>, InputPort<?>>> streamLinks = new HashMap<>(); @Override public Attribute.AttributeMap getAttributes() @@ -1197,6 +1200,7 @@ public class LogicalPlan implements Serializable, DAG subModuleMeta.setParent(this); subModuleMeta.flattenModule(dag, conf); } + dag.applyStreamLinks(); parentDAG.addDAGToCurrentDAG(this); } @@ -1300,13 +1304,52 @@ public class LogicalPlan implements Serializable, DAG public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks) { StreamMeta s = addStream(id); - s.setSource(source); - for (Operator.InputPort<?> sink: sinks) { - s.addSink(sink); + id = s.id; + ArrayListMultimap<OutputPort<?>, InputPort<?>> streamMap = ArrayListMultimap.create(); + if (!(source instanceof ProxyOutputPort)) { + s.setSource(source); + } + for (Operator.InputPort<?> sink : sinks) { + if (source instanceof ProxyOutputPort || sink instanceof ProxyInputPort) { + streamMap.put(source, sink); + streamLinks.put(id, streamMap); + } else { + if (s.getSource() == null) { + s.setSource(source); + } + s.addSink(sink); + } } return s; } + /** + * This will be called once the Logical Dag is expanded, and the proxy input and proxy output ports are populated with + * the actual ports that they refer to This method adds sources and sinks for the StreamMeta objects which were left + * empty in the addStream call. + */ + public void applyStreamLinks() + { + for (String id : streamLinks.keySet()) { + StreamMeta s = getStream(id); + for (Map.Entry<Operator.OutputPort<?>, Operator.InputPort<?>> pair : streamLinks.get(id).entries()) { + if (s.getSource() == null) { + Operator.OutputPort<?> outputPort = pair.getKey(); + while (outputPort instanceof ProxyOutputPort) { + outputPort = ((ProxyOutputPort<?>)outputPort).get(); + } + s.setSource(outputPort); + } + + Operator.InputPort<?> inputPort = pair.getValue(); + while (inputPort instanceof ProxyInputPort) { + inputPort = ((ProxyInputPort<?>)inputPort).get(); + } + s.addSink(inputPort); + } + } + } + @SuppressWarnings({ "unchecked", "rawtypes" }) private void addDAGToCurrentDAG(ModuleMeta moduleMeta) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c1314eaf/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java index 6dc4c0c..483576a 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java @@ -2128,6 +2128,7 @@ public class LogicalPlanConfiguration { for (ModuleMeta moduleMeta : dag.getAllModules()) { moduleMeta.flattenModule(dag, conf); } + dag.applyStreamLinks(); } public static Properties readProperties(String filePath) throws IOException http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c1314eaf/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java new file mode 100644 index 0000000..97c015e --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.plan.logical.module; + +import java.util.Random; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Module; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; + +/** + * Unit tests for testing Dag expansion with modules and proxy port substitution + */ +public class ModuleAppTest +{ + + /* + * Input Operator - 1 + */ + static class DummyInputOperator extends BaseOperator implements InputOperator + { + + Random r = new Random(); + public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>(); + + @Override + public void emitTuples() + { + output.emit(r.nextInt()); + } + } + + /* + * Input Operator - 1.1 + */ + static class DummyOperatorAfterInput extends BaseOperator + { + + public transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>() + { + @Override + public void process(Integer tuple) + { + output.emit(tuple); + } + }; + public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>(); + } + + /* + * Operator - 2 + */ + static class DummyOperator extends BaseOperator + { + int prop; + + public transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>() + { + @Override + public void process(Integer tuple) + { + LOG.debug(tuple.intValue() + " processed"); + output.emit(tuple); + } + }; + public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>(); + } + + /* + * Output Operator - 3 + */ + static class DummyOutputOperator extends BaseOperator + { + int prop; + + public transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>() + { + @Override + public void process(Integer tuple) + { + LOG.debug(tuple.intValue() + " processed"); + } + }; + } + + /* + * Module Definition + */ + static class TestModule implements Module + { + + public transient ProxyInputPort<Integer> moduleInput = new Module.ProxyInputPort<Integer>(); + public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<Integer>(); + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + LOG.debug("Module - PopulateDAG"); + DummyOperator dummyOperator = dag.addOperator("DummyOperator", new DummyOperator()); + moduleInput.set(dummyOperator.input); + moduleOutput.set(dummyOperator.output); + } + } + + static class Application implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + LOG.debug("Application - PopulateDAG"); + DummyInputOperator dummyInputOperator = dag.addOperator("DummyInputOperator", new DummyInputOperator()); + DummyOperatorAfterInput dummyOperatorAfterInput = dag.addOperator("DummyOperatorAfterInput", + new DummyOperatorAfterInput()); + Module m1 = dag.addModule("TestModule1", new TestModule()); + Module m2 = dag.addModule("TestModule2", new TestModule()); + DummyOutputOperator dummyOutputOperator = dag.addOperator("DummyOutputOperator", new DummyOutputOperator()); + dag.addStream("Operator To Operator", dummyInputOperator.output, dummyOperatorAfterInput.input); + dag.addStream("Operator To Module", dummyOperatorAfterInput.output, ((TestModule)m1).moduleInput); + dag.addStream("Module To Module", ((TestModule)m1).moduleOutput, ((TestModule)m2).moduleInput); + dag.addStream("Module To Operator", ((TestModule)m2).moduleOutput, dummyOutputOperator.input); + } + } + + @Test + public void validateTestApplication() + { + Configuration conf = new Configuration(false); + LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf); + LogicalPlan dag = new LogicalPlan(); + lpc.prepareDAG(dag, new Application(), "TestApp"); + + Assert.assertEquals(2, dag.getAllModules().size(), 2); + Assert.assertEquals(5, dag.getAllOperators().size()); + Assert.assertEquals(4, dag.getAllStreams().size()); + dag.validate(); + } + + private static Logger LOG = LoggerFactory.getLogger(ModuleAppTest.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c1314eaf/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java new file mode 100644 index 0000000..5bfd8f1 --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java @@ -0,0 +1,552 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.plan.logical.module; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Module; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; + +public class TestModuleExpansion +{ + static class DummyInputOperator extends BaseOperator implements InputOperator + { + private int inputOperatorProp = 0; + + Random r = new Random(); + public transient DefaultOutputPort<Integer> out = new DefaultOutputPort<Integer>(); + + @Override + public void emitTuples() + { + out.emit(r.nextInt()); + } + + public int getInputOperatorProp() + { + return inputOperatorProp; + } + + public void setInputOperatorProp(int inputOperatorProp) + { + this.inputOperatorProp = inputOperatorProp; + } + } + + static class DummyOperator extends BaseOperator + { + private int operatorProp = 0; + + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<Integer> out1 = new DefaultOutputPort<>(); + + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<Integer> out2 = new DefaultOutputPort<>(); + + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<Integer> in = new DefaultInputPort<Integer>() + { + @Override + public void process(Integer tuple) + { + out1.emit(tuple); + out2.emit(tuple); + } + }; + + public int getOperatorProp() + { + return operatorProp; + } + + public void setOperatorProp(int operatorProp) + { + this.operatorProp = operatorProp; + } + } + + static class Level1Module implements Module + { + private int level1ModuleProp = 0; + + @InputPortFieldAnnotation(optional = true) + public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>(); + @OutputPortFieldAnnotation(optional = true) + public final transient ProxyOutputPort<Integer> mOut = new ProxyOutputPort<>(); + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + DummyOperator o1 = dag.addOperator("O1", new DummyOperator()); + o1.setOperatorProp(level1ModuleProp); + mIn.set(o1.in); + mOut.set(o1.out1); + } + + public int getLevel1ModuleProp() + { + return level1ModuleProp; + } + + public void setLevel1ModuleProp(int level1ModuleProp) + { + this.level1ModuleProp = level1ModuleProp; + } + } + + static class Level2ModuleA implements Module + { + private int level2ModuleAProp1 = 0; + private int level2ModuleAProp2 = 0; + private int level2ModuleAProp3 = 0; + + @InputPortFieldAnnotation(optional = true) + public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>(); + + @OutputPortFieldAnnotation(optional = true) + public final transient ProxyOutputPort<Integer> mOut1 = new ProxyOutputPort<>(); + + @OutputPortFieldAnnotation(optional = true) + public final transient ProxyOutputPort<Integer> mOut2 = new ProxyOutputPort<>(); + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + Level1Module m1 = dag.addModule("M1", new Level1Module()); + m1.setLevel1ModuleProp(level2ModuleAProp1); + + Level1Module m2 = dag.addModule("M2", new Level1Module()); + m2.setLevel1ModuleProp(level2ModuleAProp2); + + DummyOperator o1 = dag.addOperator("O1", new DummyOperator()); + o1.setOperatorProp(level2ModuleAProp3); + + dag.addStream("M1_M2&O1", m1.mOut, m2.mIn, o1.in); + + mIn.set(m1.mIn); + mOut1.set(m2.mOut); + mOut2.set(o1.out1); + } + + public int getLevel2ModuleAProp1() + { + return level2ModuleAProp1; + } + + public void setLevel2ModuleAProp1(int level2ModuleAProp1) + { + this.level2ModuleAProp1 = level2ModuleAProp1; + } + + public int getLevel2ModuleAProp2() + { + return level2ModuleAProp2; + } + + public void setLevel2ModuleAProp2(int level2ModuleAProp2) + { + this.level2ModuleAProp2 = level2ModuleAProp2; + } + + public int getLevel2ModuleAProp3() + { + return level2ModuleAProp3; + } + + public void setLevel2ModuleAProp3(int level2ModuleAProp3) + { + this.level2ModuleAProp3 = level2ModuleAProp3; + } + } + + static class Level2ModuleB implements Module + { + private int level2ModuleBProp1 = 0; + private int level2ModuleBProp2 = 0; + private int level2ModuleBProp3 = 0; + + @InputPortFieldAnnotation(optional = true) + public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>(); + + @OutputPortFieldAnnotation(optional = true) + public final transient ProxyOutputPort<Integer> mOut1 = new ProxyOutputPort<>(); + + @OutputPortFieldAnnotation(optional = true) + public final transient ProxyOutputPort<Integer> mOut2 = new ProxyOutputPort<>(); + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + DummyOperator o1 = dag.addOperator("O1", new DummyOperator()); + o1.setOperatorProp(level2ModuleBProp1); + + Level1Module m1 = dag.addModule("M1", new Level1Module()); + m1.setLevel1ModuleProp(level2ModuleBProp2); + + DummyOperator o2 = dag.addOperator("O2", new DummyOperator()); + o2.setOperatorProp(level2ModuleBProp3); + + dag.addStream("O1_M1", o1.out1, m1.mIn); + dag.addStream("O1_O2", o1.out2, o2.in); + + mIn.set(o1.in); + mOut1.set(m1.mOut); + mOut2.set(o2.out1); + } + + public int getLevel2ModuleBProp1() + { + return level2ModuleBProp1; + } + + public void setLevel2ModuleBProp1(int level2ModuleBProp1) + { + this.level2ModuleBProp1 = level2ModuleBProp1; + } + + public int getLevel2ModuleBProp2() + { + return level2ModuleBProp2; + } + + public void setLevel2ModuleBProp2(int level2ModuleBProp2) + { + this.level2ModuleBProp2 = level2ModuleBProp2; + } + + public int getLevel2ModuleBProp3() + { + return level2ModuleBProp3; + } + + public void setLevel2ModuleBProp3(int level2ModuleBProp3) + { + this.level2ModuleBProp3 = level2ModuleBProp3; + } + } + + static class Level3Module implements Module + { + + public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>(); + public final transient ProxyOutputPort<Integer> mOut1 = new ProxyOutputPort<>(); + public final transient ProxyOutputPort<Integer> mOut2 = new ProxyOutputPort<>(); + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + DummyOperator op = dag.addOperator("O1", new DummyOperator()); + Level2ModuleB m1 = dag.addModule("M1", new Level2ModuleB()); + Level1Module m2 = dag.addModule("M2", new Level1Module()); + + dag.addStream("s1", op.out1, m1.mIn); + dag.addStream("s2", op.out2, m2.mIn); + + mIn.set(op.in); + mOut1.set(m1.mOut1); + mOut2.set(m2.mOut); + } + } + + static class NestedModuleApp implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + DummyInputOperator o1 = dag.addOperator("O1", new DummyInputOperator()); + o1.setInputOperatorProp(1); + + DummyOperator o2 = dag.addOperator("O2", new DummyOperator()); + o2.setOperatorProp(2); + + Level2ModuleA ma = dag.addModule("Ma", new Level2ModuleA()); + ma.setLevel2ModuleAProp1(11); + ma.setLevel2ModuleAProp2(12); + ma.setLevel2ModuleAProp3(13); + + Level2ModuleB mb = dag.addModule("Mb", new Level2ModuleB()); + mb.setLevel2ModuleBProp1(21); + mb.setLevel2ModuleBProp2(22); + mb.setLevel2ModuleBProp3(23); + + Level2ModuleA mc = dag.addModule("Mc", new Level2ModuleA()); + mc.setLevel2ModuleAProp1(31); + mc.setLevel2ModuleAProp2(32); + mc.setLevel2ModuleAProp3(33); + + Level2ModuleB md = dag.addModule("Md", new Level2ModuleB()); + md.setLevel2ModuleBProp1(41); + md.setLevel2ModuleBProp2(42); + md.setLevel2ModuleBProp3(43); + + Level3Module me = dag.addModule("Me", new Level3Module()); + + dag.addStream("O1_O2", o1.out, o2.in, me.mIn); + dag.addStream("O2_Ma", o2.out1, ma.mIn); + dag.addStream("Ma_Mb", ma.mOut1, mb.mIn); + dag.addStream("Ma_Md", ma.mOut2, md.mIn); + dag.addStream("Mb_Mc", mb.mOut2, mc.mIn); + } + } + + @Test + public void testModuleExtreme() + { + StreamingApplication app = new NestedModuleApp(); + Configuration conf = new Configuration(false); + LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf); + LogicalPlan dag = new LogicalPlan(); + lpc.prepareDAG(dag, app, "ModuleApp"); + + dag.validate(); + validateTopLevelOperators(dag); + validateTopLevelStreams(dag); + validatePublicMethods(dag); + } + + private void validateTopLevelStreams(LogicalPlan dag) + { + List<String> streamNames = new ArrayList<>(); + for (LogicalPlan.StreamMeta streamMeta : dag.getAllStreams()) { + streamNames.add(streamMeta.getName()); + } + + Assert.assertTrue(streamNames.contains(componentName("Mb", "O1_M1"))); + Assert.assertTrue(streamNames.contains("O2_Ma")); + Assert.assertTrue(streamNames.contains("Mb_Mc")); + Assert.assertTrue(streamNames.contains(componentName("Mb", "O1_O2"))); + Assert.assertTrue(streamNames.contains(componentName("Ma", "M1_M2&O1"))); + Assert.assertTrue(streamNames.contains(componentName("Md", "O1_M1"))); + Assert.assertTrue(streamNames.contains(componentName("Ma_Md"))); + Assert.assertTrue(streamNames.contains(componentName("Mc", "M1_M2&O1"))); + Assert.assertTrue(streamNames.contains(componentName("Md", "O1_O2"))); + Assert.assertTrue(streamNames.contains("Ma_Mb")); + Assert.assertTrue(streamNames.contains("O1_O2")); + + validateSeperateStream(dag, componentName("Mb", "O1_M1"), componentName("Mb", "O1"), + componentName("Mb", "M1", "O1")); + validateSeperateStream(dag, "O2_Ma", "O2", componentName("Ma", "M1", "O1")); + validateSeperateStream(dag, "Mb_Mc", componentName("Mb", "O2"), componentName("Mc", "M1", "O1")); + validateSeperateStream(dag, componentName("Mb", "O1_O2"), componentName("Mb", "O1"), componentName("Mb", "O2")); + validateSeperateStream(dag, componentName("Ma", "M1_M2&O1"), componentName("Ma", "M1", "O1"), + componentName("Ma", "O1"), componentName("Ma", "M2", "O1")); + validateSeperateStream(dag, componentName("Md", "O1_M1"), componentName("Md", "O1"), + componentName("Md", "M1", "O1")); + validateSeperateStream(dag, "Ma_Md", componentName("Ma", "O1"), componentName("Md", "O1")); + validateSeperateStream(dag, componentName("Mc", "M1_M2&O1"), componentName("Mc", "M1", "O1"), + componentName("Mc", "O1"), componentName("Mc", "M2", "O1")); + validateSeperateStream(dag, componentName("Md", "O1_O2"), componentName("Md", "O1"), componentName("Md", "O2")); + validateSeperateStream(dag, "Ma_Mb", componentName("Ma", "M2", "O1"), componentName("Mb", "O1")); + validateSeperateStream(dag, "O1_O2", "O1", "O2", componentName("Me", "O1")); + } + + private void validateSeperateStream(LogicalPlan dag, String streamName, String inputOperatorName, + String... outputOperatorNames) + { + LogicalPlan.StreamMeta streamMeta = dag.getStream(streamName); + String sourceName = streamMeta.getSource().getOperatorMeta().getName(); + + List<String> sinksName = new ArrayList<>(); + for (LogicalPlan.InputPortMeta inputPortMeta : streamMeta.getSinks()) { + sinksName.add(inputPortMeta.getOperatorWrapper().getName()); + } + + Assert.assertTrue(inputOperatorName.equals(sourceName)); + Assert.assertEquals(outputOperatorNames.length, sinksName.size()); + + for (String outputOperatorName : outputOperatorNames) { + Assert.assertTrue(sinksName.contains(outputOperatorName)); + } + } + + private void validateTopLevelOperators(LogicalPlan dag) + { + List<String> operatorNames = new ArrayList<>(); + for (LogicalPlan.OperatorMeta operatorMeta : dag.getAllOperators()) { + operatorNames.add(operatorMeta.getName()); + } + Assert.assertTrue(operatorNames.contains("O1")); + Assert.assertTrue(operatorNames.contains("O2")); + Assert.assertTrue(operatorNames.contains(componentName("Ma", "M1", "O1"))); + Assert.assertTrue(operatorNames.contains(componentName("Ma", "M2", "O1"))); + Assert.assertTrue(operatorNames.contains(componentName("Ma", "O1"))); + Assert.assertTrue(operatorNames.contains(componentName("Mb", "O1"))); + Assert.assertTrue(operatorNames.contains(componentName("Mb", "M1", "O1"))); + Assert.assertTrue(operatorNames.contains(componentName("Mb", "O2"))); + Assert.assertTrue(operatorNames.contains(componentName("Mc", "M1", "O1"))); + Assert.assertTrue(operatorNames.contains(componentName("Mc", "M2", "O1"))); + Assert.assertTrue(operatorNames.contains(componentName("Mc", "O1"))); + Assert.assertTrue(operatorNames.contains(componentName("Md", "O1"))); + Assert.assertTrue(operatorNames.contains(componentName("Md", "M1", "O1"))); + Assert.assertTrue(operatorNames.contains(componentName("Md", "O2"))); + + validateOperatorPropertyValue(dag, "O1", 1); + validateOperatorPropertyValue(dag, "O2", 2); + validateOperatorPropertyValue(dag, componentName("Ma", "M1", "O1"), 11); + validateOperatorPropertyValue(dag, componentName("Ma", "M2", "O1"), 12); + validateOperatorPropertyValue(dag, componentName("Ma", "O1"), 13); + validateOperatorPropertyValue(dag, componentName("Mb", "O1"), 21); + validateOperatorPropertyValue(dag, componentName("Mb", "M1", "O1"), 22); + validateOperatorPropertyValue(dag, componentName("Mb", "O2"), 23); + validateOperatorPropertyValue(dag, componentName("Mc", "M1", "O1"), 31); + validateOperatorPropertyValue(dag, componentName("Mc", "M2", "O1"), 32); + validateOperatorPropertyValue(dag, componentName("Mc", "O1"), 33); + validateOperatorPropertyValue(dag, componentName("Md", "O1"), 41); + validateOperatorPropertyValue(dag, componentName("Md", "M1", "O1"), 42); + validateOperatorPropertyValue(dag, componentName("Md", "O2"), 43); + + validateOperatorParent(dag, "O1", null); + validateOperatorParent(dag, "O2", null); + validateOperatorParent(dag, componentName("Ma", "M1", "O1"), componentName("Ma", "M1")); + validateOperatorParent(dag, componentName("Ma", "M2", "O1"), componentName("Ma", "M2")); + validateOperatorParent(dag, componentName("Ma", "O1"), "Ma"); + validateOperatorParent(dag, componentName("Mb", "O1"), "Mb"); + validateOperatorParent(dag, componentName("Mb", "M1", "O1"), componentName("Mb", "M1")); + validateOperatorParent(dag, componentName("Mb", "O2"), "Mb"); + validateOperatorParent(dag, componentName("Mc", "M1", "O1"), componentName("Mc", "M1")); + validateOperatorParent(dag, componentName("Mc", "M2", "O1"), componentName("Mc", "M2")); + validateOperatorParent(dag, componentName("Mc", "O1"), "Mc"); + validateOperatorParent(dag, componentName("Md", "O1"), "Md"); + validateOperatorParent(dag, componentName("Md", "M1", "O1"), componentName("Md", "M1")); + validateOperatorParent(dag, componentName("Md", "O2"), "Md"); + } + + private void validateOperatorParent(LogicalPlan dag, String operatorName, String parentModuleName) + { + LogicalPlan.OperatorMeta operatorMeta = dag.getOperatorMeta(operatorName); + if (parentModuleName == null) { + Assert.assertNull(operatorMeta.getModuleName()); + } else { + Assert.assertTrue(parentModuleName.equals(operatorMeta.getModuleName())); + } + } + + private void validateOperatorPropertyValue(LogicalPlan dag, String operatorName, int expectedValue) + { + LogicalPlan.OperatorMeta oMeta = dag.getOperatorMeta(operatorName); + if (operatorName.equals("O1")) { + DummyInputOperator operator = (DummyInputOperator)oMeta.getOperator(); + Assert.assertEquals(expectedValue, operator.getInputOperatorProp()); + } else { + DummyOperator operator = (DummyOperator)oMeta.getOperator(); + Assert.assertEquals(expectedValue, operator.getOperatorProp()); + } + } + + private void validatePublicMethods(LogicalPlan dag) + { + // Logical dag contains 4 modules added on top level. + List<String> moduleNames = new ArrayList<>(); + for (LogicalPlan.ModuleMeta moduleMeta : dag.getAllModules()) { + moduleNames.add(moduleMeta.getName()); + } + Assert.assertTrue(moduleNames.contains("Ma")); + Assert.assertTrue(moduleNames.contains("Mb")); + Assert.assertTrue(moduleNames.contains("Mc")); + Assert.assertTrue(moduleNames.contains("Md")); + Assert.assertTrue(moduleNames.contains("Me")); + Assert.assertEquals("Number of modules are 5", 5, dag.getAllModules().size()); + + // correct module meta is returned by getMeta call. + LogicalPlan.ModuleMeta m = dag.getModuleMeta("Ma"); + Assert.assertEquals("Name of module is Ma", m.getName(), "Ma"); + + } + + private static String componentName(String... names) + { + if (names.length == 0) { + return ""; + } + StringBuilder sb = new StringBuilder(names[0]); + for (int i = 1; i < names.length; i++) { + sb.append(LogicalPlan.MODULE_NAMESPACE_SEPARATOR); + sb.append(names[i]); + } + return sb.toString(); + } + + /** + * Generate a conflict, Add a top level operator with name "m1_O1", + * and add a module "m1" which will populate operator "O1", causing name conflict with + * top level operator. + */ + @Test(expected = java.lang.IllegalArgumentException.class) + public void conflictingNamesWithExpandedModule() + { + Configuration conf = new Configuration(false); + LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf); + LogicalPlan dag = new LogicalPlan(); + DummyInputOperator in = dag.addOperator(componentName("m1", "O1"), new DummyInputOperator()); + Level2ModuleA module = dag.addModule("m1", new Level2ModuleA()); + dag.addStream("s1", in.out, module.mIn); + lpc.prepareDAG(dag, null, "ModuleApp"); + dag.validate(); + } + + /** + * Module and Operator with same name is not allowed in a DAG, to prevent properties + * conflict. + */ + @Test(expected = java.lang.IllegalArgumentException.class) + public void conflictingNamesWithOperator1() + { + Configuration conf = new Configuration(false); + LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf); + LogicalPlan dag = new LogicalPlan(); + DummyInputOperator in = dag.addOperator("M1", new DummyInputOperator()); + Level2ModuleA module = dag.addModule("M1", new Level2ModuleA()); + dag.addStream("s1", in.out, module.mIn); + lpc.prepareDAG(dag, null, "ModuleApp"); + dag.validate(); + } + + /** + * Module and Operator with same name is not allowed in a DAG, to prevent properties + * conflict. + */ + @Test(expected = java.lang.IllegalArgumentException.class) + public void conflictingNamesWithOperator2() + { + Configuration conf = new Configuration(false); + LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf); + LogicalPlan dag = new LogicalPlan(); + Level2ModuleA module = dag.addModule("M1", new Level2ModuleA()); + DummyInputOperator in = dag.addOperator("M1", new DummyInputOperator()); + dag.addStream("s1", in.out, module.mIn); + lpc.prepareDAG(dag, null, "ModuleApp"); + dag.validate(); + } +}
