APEXCORE-272 copy operator and port attributes from module dag to parent dag.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/be075b81 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/be075b81 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/be075b81 Branch: refs/heads/master Commit: be075b81d5f5a7b4d5b556cfa3c5f28f349333f5 Parents: 3f76dcb Author: Tushar R. Gosavi <[email protected]> Authored: Wed Dec 23 22:01:50 2015 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Thu Dec 31 12:00:29 2015 +0530 ---------------------------------------------------------------------- .../stram/plan/logical/LogicalPlan.java | 47 ++++++- .../logical/module/TestModuleExpansion.java | 128 ++++++++++++++++++- 2 files changed, 169 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/be075b81/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 347e94f..867f814 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -910,6 +910,45 @@ public class LogicalPlan implements Serializable, DAG getValue(OperatorContext.METRICS_DIMENSIONS_SCHEME)); } + /** + * Copy attribute from source attributeMap to destination attributeMap. + * + * @param dest destination attribute map. + * @param source source attribute map. + */ + private void copyAttributes(AttributeMap dest, AttributeMap source) + { + for (Entry<Attribute<?>, ?> a : source.entrySet()) { + dest.put((Attribute<Object>)a.getKey(), a.getValue()); + } + } + + /** + * Copy attribute of operator and port from provided operatorMeta. This function requires + * operatorMeta argument is for the same operator. + * + * @param operatorMeta copy attribute from this OperatorMeta to the object. + */ + private void copyAttributesFrom(OperatorMeta operatorMeta) + { + if (operator != operatorMeta.getOperator()) { + throw new IllegalArgumentException("Operator meta is not for the same operator "); + } + + // copy operator attributes + copyAttributes(attributes, operatorMeta.getAttributes()); + + // copy Input port attributes + for (Map.Entry<InputPort<?>, InputPortMeta> entry : operatorMeta.getPortMapping().inPortMap.entrySet()) { + copyAttributes(getPortMapping().inPortMap.get(entry.getKey()).attributes, entry.getValue().attributes); + } + + // copy Output port attributes + for (Map.Entry<OutputPort<?>, OutputPortMeta> entry : operatorMeta.getPortMapping().outPortMap.entrySet()) { + copyAttributes(getPortMapping().outPortMap.get(entry.getKey()).attributes, entry.getValue().attributes); + } + } + private class PortMapping implements Operators.OperatorDescriptor { private final Map<Operator.InputPort<?>, InputPortMeta> inPortMap = new HashMap<Operator.InputPort<?>, InputPortMeta>(); @@ -1362,8 +1401,9 @@ public class LogicalPlan implements Serializable, DAG String name; for (OperatorMeta operatorMeta : subDag.getAllOperators()) { name = subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName(); - this.addOperator(name, operatorMeta.getOperator()); - OperatorMeta operatorMetaNew = this.getOperatorMeta(name); + Operator op = this.addOperator(name, operatorMeta.getOperator()); + OperatorMeta operatorMetaNew = this.getMeta(op); + operatorMetaNew.copyAttributesFrom(operatorMeta); operatorMetaNew.setModuleName(operatorMeta.getModuleName() == null ? subDAGName : subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getModuleName()); } @@ -1377,7 +1417,8 @@ public class LogicalPlan implements Serializable, DAG InputPort[] inputPorts = ports.toArray(new InputPort[]{}); name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName(); - this.addStream(name, sourceMeta.getPortObject(), inputPorts); + StreamMeta streamMetaNew = this.addStream(name, sourceMeta.getPortObject(), inputPorts); + streamMetaNew.setLocality(streamMeta.getLocality()); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/be075b81/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java index 5bfd8f1..d5af67b 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java @@ -18,8 +18,11 @@ */ package com.datatorrent.stram.plan.logical.module; +import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Random; import org.junit.Assert; @@ -27,15 +30,19 @@ import org.junit.Test; import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; import com.datatorrent.api.Module; +import com.datatorrent.api.Partitioner; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.stram.engine.OperatorContext; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; @@ -97,6 +104,23 @@ public class TestModuleExpansion } } + static class TestPartitioner implements Partitioner<DummyOperator>, Serializable + { + @Override + public Collection<Partition<DummyOperator>> definePartitions(Collection<Partition<DummyOperator>> partitions, PartitioningContext context) + { + ArrayList<Partition<DummyOperator>> lst = new ArrayList(); + lst.add(partitions.iterator().next()); + return lst; + } + + @Override + public void partitioned(Map<Integer, Partition<DummyOperator>> partitions) + { + + } + } + static class Level1Module implements Module { private int level1ModuleProp = 0; @@ -105,12 +129,26 @@ public class TestModuleExpansion public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>(); @OutputPortFieldAnnotation(optional = true) public final transient ProxyOutputPort<Integer> mOut = new ProxyOutputPort<>(); + private int memory = 512; + private int portMemory = 2; @Override public void populateDAG(DAG dag, Configuration conf) { DummyOperator o1 = dag.addOperator("O1", new DummyOperator()); o1.setOperatorProp(level1ModuleProp); + + /** set various attribute on the operator for testing */ + Attribute.AttributeMap attr = dag.getMeta(o1).getAttributes(); + attr.put(OperatorContext.MEMORY_MB, memory); + attr.put(OperatorContext.APPLICATION_WINDOW_COUNT, 2); + attr.put(OperatorContext.LOCALITY_HOST, "host1"); + attr.put(OperatorContext.PARTITIONER, new TestPartitioner()); + attr.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 120); + attr.put(OperatorContext.STATELESS, true); + attr.put(OperatorContext.SPIN_MILLIS, 20); + + dag.setInputPortAttribute(o1.in, Context.PortContext.BUFFER_MEMORY_MB, portMemory); mIn.set(o1.in); mOut.set(o1.out1); } @@ -124,6 +162,26 @@ public class TestModuleExpansion { this.level1ModuleProp = level1ModuleProp; } + + public int getMemory() + { + return memory; + } + + public void setMemory(int memory) + { + this.memory = memory; + } + + public int getPortMemory() + { + return portMemory; + } + + public void setPortMemory(int portMemory) + { + this.portMemory = portMemory; + } } static class Level2ModuleA implements Module @@ -145,15 +203,19 @@ public class TestModuleExpansion public void populateDAG(DAG dag, Configuration conf) { Level1Module m1 = dag.addModule("M1", new Level1Module()); + m1.setMemory(1024); + m1.setPortMemory(1); m1.setLevel1ModuleProp(level2ModuleAProp1); Level1Module m2 = dag.addModule("M2", new Level1Module()); + m2.setMemory(2048); + m2.setPortMemory(2); m2.setLevel1ModuleProp(level2ModuleAProp2); DummyOperator o1 = dag.addOperator("O1", new DummyOperator()); o1.setOperatorProp(level2ModuleAProp3); - dag.addStream("M1_M2&O1", m1.mOut, m2.mIn, o1.in); + dag.addStream("M1_M2&O1", m1.mOut, m2.mIn, o1.in).setLocality(DAG.Locality.CONTAINER_LOCAL); mIn.set(m1.mIn); mOut1.set(m2.mOut); @@ -213,13 +275,15 @@ public class TestModuleExpansion o1.setOperatorProp(level2ModuleBProp1); Level1Module m1 = dag.addModule("M1", new Level1Module()); + m1.setMemory(4096); + m1.setPortMemory(3); m1.setLevel1ModuleProp(level2ModuleBProp2); DummyOperator o2 = dag.addOperator("O2", new DummyOperator()); o2.setOperatorProp(level2ModuleBProp3); - dag.addStream("O1_M1", o1.out1, m1.mIn); - dag.addStream("O1_O2", o1.out2, o2.in); + dag.addStream("O1_M1", o1.out1, m1.mIn).setLocality(DAG.Locality.THREAD_LOCAL); + dag.addStream("O1_O2", o1.out2, o2.in).setLocality(DAG.Locality.RACK_LOCAL); mIn.set(o1.in); mOut1.set(m1.mOut); @@ -370,6 +434,15 @@ public class TestModuleExpansion validateSeperateStream(dag, componentName("Md", "O1_O2"), componentName("Md", "O1"), componentName("Md", "O2")); validateSeperateStream(dag, "Ma_Mb", componentName("Ma", "M2", "O1"), componentName("Mb", "O1")); validateSeperateStream(dag, "O1_O2", "O1", "O2", componentName("Me", "O1")); + + /* Verify that stream locality is set correctly in top level dag */ + validateStreamLocality(dag, componentName("Mc", "M1_M2&O1"), DAG.Locality.CONTAINER_LOCAL); + validateStreamLocality(dag, componentName("Mb", "O1_M1"), DAG.Locality.THREAD_LOCAL); + validateStreamLocality(dag, componentName("Mb", "O1_O2"), DAG.Locality.RACK_LOCAL); + validateStreamLocality(dag, componentName("Mc", "M1_M2&O1"), DAG.Locality.CONTAINER_LOCAL); + validateStreamLocality(dag, componentName("Md", "O1_M1"), DAG.Locality.THREAD_LOCAL); + validateStreamLocality(dag, componentName("Me", "s1"), null); + } private void validateSeperateStream(LogicalPlan dag, String streamName, String inputOperatorName, @@ -441,6 +514,18 @@ public class TestModuleExpansion validateOperatorParent(dag, componentName("Md", "O1"), "Md"); validateOperatorParent(dag, componentName("Md", "M1", "O1"), componentName("Md", "M1")); validateOperatorParent(dag, componentName("Md", "O2"), "Md"); + + validateOperatorAttribute(dag, componentName("Ma", "M1", "O1"), 1024); + validateOperatorAttribute(dag, componentName("Ma", "M2", "O1"), 2048); + validateOperatorAttribute(dag, componentName("Mb", "M1", "O1"), 4096); + validateOperatorAttribute(dag, componentName("Mc", "M1", "O1"), 1024); + validateOperatorAttribute(dag, componentName("Mc", "M2", "O1"), 2048); + + validatePortAttribute(dag, componentName("Ma", "M1", "O1"), 1); + validatePortAttribute(dag, componentName("Ma", "M2", "O1"), 2); + validatePortAttribute(dag, componentName("Mb", "M1", "O1"), 3); + validatePortAttribute(dag, componentName("Mc", "M1", "O1"), 1); + validatePortAttribute(dag, componentName("Mc", "M2", "O1"), 2); } private void validateOperatorParent(LogicalPlan dag, String operatorName, String parentModuleName) @@ -549,4 +634,41 @@ public class TestModuleExpansion lpc.prepareDAG(dag, null, "ModuleApp"); dag.validate(); } + + /** + * Verify attributes populated on DummyOperator from Level1 module + */ + private void validateOperatorAttribute(LogicalPlan dag, String name, int memory) + { + LogicalPlan.OperatorMeta oMeta = dag.getOperatorMeta(name); + Attribute.AttributeMap attrs = oMeta.getAttributes(); + Assert.assertEquals((int)attrs.get(OperatorContext.MEMORY_MB), memory); + Assert.assertEquals("Application window id is 2 ", (int)attrs.get(OperatorContext.APPLICATION_WINDOW_COUNT), 2); + Assert.assertEquals("Locality host is host1", attrs.get(OperatorContext.LOCALITY_HOST), "host1"); + Assert.assertEquals(attrs.get(OperatorContext.PARTITIONER).getClass(), TestPartitioner.class); + Assert.assertEquals("Checkpoint window count ", (int)attrs.get(OperatorContext.CHECKPOINT_WINDOW_COUNT), 120); + Assert.assertEquals("Operator is stateless ", attrs.get(OperatorContext.STATELESS), true); + Assert.assertEquals("SPIN MILLIS is set to 20 ", (int)attrs.get(OperatorContext.SPIN_MILLIS), 20); + + } + + /** + * Validate attribute set on the port of DummyOperator in Level1Module + */ + private void validatePortAttribute(LogicalPlan dag, String name, int memory) + { + LogicalPlan.InputPortMeta imeta = dag.getOperatorMeta(name).getInputStreams().keySet().iterator().next(); + Assert.assertEquals(memory, (int)imeta.getAttributes().get(Context.PortContext.BUFFER_MEMORY_MB)); + } + + /** + * validate if stream attributes are copied or not + */ + private void validateStreamLocality(LogicalPlan dag, String name, DAG.Locality locality) + { + LogicalPlan.StreamMeta meta = dag.getStream(name); + Assert.assertTrue("Metadata for stream is available ", meta != null); + Assert.assertEquals("Locality is " + locality, meta.getLocality(), locality); + } + }
