APEXCORE-272 copy operator and port attributes from module dag to parent dag.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/be075b81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/be075b81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/be075b81

Branch: refs/heads/master
Commit: be075b81d5f5a7b4d5b556cfa3c5f28f349333f5
Parents: 3f76dcb
Author: Tushar R. Gosavi <[email protected]>
Authored: Wed Dec 23 22:01:50 2015 +0530
Committer: Tushar R. Gosavi <[email protected]>
Committed: Thu Dec 31 12:00:29 2015 +0530

----------------------------------------------------------------------
 .../stram/plan/logical/LogicalPlan.java         |  47 ++++++-
 .../logical/module/TestModuleExpansion.java     | 128 ++++++++++++++++++-
 2 files changed, 169 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/be075b81/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java 
b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 347e94f..867f814 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -910,6 +910,45 @@ public class LogicalPlan implements Serializable, DAG
         getValue(OperatorContext.METRICS_DIMENSIONS_SCHEME));
     }
 
+    /**
+     * Copy attribute from source attributeMap to destination attributeMap.
+     *
+     * @param dest  destination attribute map.
+     * @param source source attribute map.
+     */
+    private void copyAttributes(AttributeMap dest, AttributeMap source)
+    {
+      for (Entry<Attribute<?>, ?> a : source.entrySet()) {
+        dest.put((Attribute<Object>)a.getKey(), a.getValue());
+      }
+    }
+
+    /**
+     * Copy attribute of operator and port from provided operatorMeta. This 
function requires
+     * operatorMeta argument is for the same operator.
+     *
+     * @param operatorMeta copy attribute from this OperatorMeta to the object.
+     */
+    private void copyAttributesFrom(OperatorMeta operatorMeta)
+    {
+      if (operator != operatorMeta.getOperator()) {
+        throw new IllegalArgumentException("Operator meta is not for the same 
operator ");
+      }
+
+      // copy operator attributes
+      copyAttributes(attributes, operatorMeta.getAttributes());
+
+      // copy Input port attributes
+      for (Map.Entry<InputPort<?>, InputPortMeta> entry : 
operatorMeta.getPortMapping().inPortMap.entrySet()) {
+        
copyAttributes(getPortMapping().inPortMap.get(entry.getKey()).attributes, 
entry.getValue().attributes);
+      }
+
+      // copy Output port attributes
+      for (Map.Entry<OutputPort<?>, OutputPortMeta> entry : 
operatorMeta.getPortMapping().outPortMap.entrySet()) {
+        
copyAttributes(getPortMapping().outPortMap.get(entry.getKey()).attributes, 
entry.getValue().attributes);
+      }
+    }
+
     private class PortMapping implements Operators.OperatorDescriptor
     {
       private final Map<Operator.InputPort<?>, InputPortMeta> inPortMap = new 
HashMap<Operator.InputPort<?>, InputPortMeta>();
@@ -1362,8 +1401,9 @@ public class LogicalPlan implements Serializable, DAG
     String name;
     for (OperatorMeta operatorMeta : subDag.getAllOperators()) {
       name = subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName();
-      this.addOperator(name, operatorMeta.getOperator());
-      OperatorMeta operatorMetaNew = this.getOperatorMeta(name);
+      Operator op = this.addOperator(name, operatorMeta.getOperator());
+      OperatorMeta operatorMetaNew = this.getMeta(op);
+      operatorMetaNew.copyAttributesFrom(operatorMeta);
       operatorMetaNew.setModuleName(operatorMeta.getModuleName() == null ? 
subDAGName :
           subDAGName + MODULE_NAMESPACE_SEPARATOR + 
operatorMeta.getModuleName());
     }
@@ -1377,7 +1417,8 @@ public class LogicalPlan implements Serializable, DAG
       InputPort[] inputPorts = ports.toArray(new InputPort[]{});
 
       name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName();
-      this.addStream(name, sourceMeta.getPortObject(), inputPorts);
+      StreamMeta streamMetaNew = this.addStream(name, 
sourceMeta.getPortObject(), inputPorts);
+      streamMetaNew.setLocality(streamMeta.getLocality());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/be075b81/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
index 5bfd8f1..d5af67b 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
@@ -18,8 +18,11 @@
  */
 package com.datatorrent.stram.plan.logical.module;
 
+import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 import org.junit.Assert;
@@ -27,15 +30,19 @@ import org.junit.Test;
 
 import org.apache.hadoop.conf.Configuration;
 
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Module;
+import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.stram.engine.OperatorContext;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
 
@@ -97,6 +104,23 @@ public class TestModuleExpansion
     }
   }
 
+  static class TestPartitioner implements Partitioner<DummyOperator>, 
Serializable
+  {
+    @Override
+    public Collection<Partition<DummyOperator>> 
definePartitions(Collection<Partition<DummyOperator>> partitions, 
PartitioningContext context)
+    {
+      ArrayList<Partition<DummyOperator>> lst = new ArrayList();
+      lst.add(partitions.iterator().next());
+      return lst;
+    }
+
+    @Override
+    public void partitioned(Map<Integer, Partition<DummyOperator>> partitions)
+    {
+
+    }
+  }
+
   static class Level1Module implements Module
   {
     private int level1ModuleProp = 0;
@@ -105,12 +129,26 @@ public class TestModuleExpansion
     public final transient ProxyInputPort<Integer> mIn = new 
ProxyInputPort<>();
     @OutputPortFieldAnnotation(optional = true)
     public final transient ProxyOutputPort<Integer> mOut = new 
ProxyOutputPort<>();
+    private int memory = 512;
+    private int portMemory = 2;
 
     @Override
     public void populateDAG(DAG dag, Configuration conf)
     {
       DummyOperator o1 = dag.addOperator("O1", new DummyOperator());
       o1.setOperatorProp(level1ModuleProp);
+
+      /** set various attribute on the operator for testing */
+      Attribute.AttributeMap attr = dag.getMeta(o1).getAttributes();
+      attr.put(OperatorContext.MEMORY_MB, memory);
+      attr.put(OperatorContext.APPLICATION_WINDOW_COUNT, 2);
+      attr.put(OperatorContext.LOCALITY_HOST, "host1");
+      attr.put(OperatorContext.PARTITIONER, new TestPartitioner());
+      attr.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 120);
+      attr.put(OperatorContext.STATELESS, true);
+      attr.put(OperatorContext.SPIN_MILLIS, 20);
+
+      dag.setInputPortAttribute(o1.in, Context.PortContext.BUFFER_MEMORY_MB, 
portMemory);
       mIn.set(o1.in);
       mOut.set(o1.out1);
     }
@@ -124,6 +162,26 @@ public class TestModuleExpansion
     {
       this.level1ModuleProp = level1ModuleProp;
     }
+
+    public int getMemory()
+    {
+      return memory;
+    }
+
+    public void setMemory(int memory)
+    {
+      this.memory = memory;
+    }
+
+    public int getPortMemory()
+    {
+      return portMemory;
+    }
+
+    public void setPortMemory(int portMemory)
+    {
+      this.portMemory = portMemory;
+    }
   }
 
   static class Level2ModuleA implements Module
@@ -145,15 +203,19 @@ public class TestModuleExpansion
     public void populateDAG(DAG dag, Configuration conf)
     {
       Level1Module m1 = dag.addModule("M1", new Level1Module());
+      m1.setMemory(1024);
+      m1.setPortMemory(1);
       m1.setLevel1ModuleProp(level2ModuleAProp1);
 
       Level1Module m2 = dag.addModule("M2", new Level1Module());
+      m2.setMemory(2048);
+      m2.setPortMemory(2);
       m2.setLevel1ModuleProp(level2ModuleAProp2);
 
       DummyOperator o1 = dag.addOperator("O1", new DummyOperator());
       o1.setOperatorProp(level2ModuleAProp3);
 
-      dag.addStream("M1_M2&O1", m1.mOut, m2.mIn, o1.in);
+      dag.addStream("M1_M2&O1", m1.mOut, m2.mIn, 
o1.in).setLocality(DAG.Locality.CONTAINER_LOCAL);
 
       mIn.set(m1.mIn);
       mOut1.set(m2.mOut);
@@ -213,13 +275,15 @@ public class TestModuleExpansion
       o1.setOperatorProp(level2ModuleBProp1);
 
       Level1Module m1 = dag.addModule("M1", new Level1Module());
+      m1.setMemory(4096);
+      m1.setPortMemory(3);
       m1.setLevel1ModuleProp(level2ModuleBProp2);
 
       DummyOperator o2 = dag.addOperator("O2", new DummyOperator());
       o2.setOperatorProp(level2ModuleBProp3);
 
-      dag.addStream("O1_M1", o1.out1, m1.mIn);
-      dag.addStream("O1_O2", o1.out2, o2.in);
+      dag.addStream("O1_M1", o1.out1, 
m1.mIn).setLocality(DAG.Locality.THREAD_LOCAL);
+      dag.addStream("O1_O2", o1.out2, 
o2.in).setLocality(DAG.Locality.RACK_LOCAL);
 
       mIn.set(o1.in);
       mOut1.set(m1.mOut);
@@ -370,6 +434,15 @@ public class TestModuleExpansion
     validateSeperateStream(dag, componentName("Md", "O1_O2"), 
componentName("Md", "O1"), componentName("Md", "O2"));
     validateSeperateStream(dag, "Ma_Mb", componentName("Ma", "M2", "O1"), 
componentName("Mb", "O1"));
     validateSeperateStream(dag, "O1_O2", "O1", "O2", componentName("Me", 
"O1"));
+
+    /* Verify that stream locality is set correctly in top level dag */
+    validateStreamLocality(dag, componentName("Mc", "M1_M2&O1"), 
DAG.Locality.CONTAINER_LOCAL);
+    validateStreamLocality(dag, componentName("Mb", "O1_M1"), 
DAG.Locality.THREAD_LOCAL);
+    validateStreamLocality(dag, componentName("Mb", "O1_O2"), 
DAG.Locality.RACK_LOCAL);
+    validateStreamLocality(dag, componentName("Mc", "M1_M2&O1"), 
DAG.Locality.CONTAINER_LOCAL);
+    validateStreamLocality(dag, componentName("Md", "O1_M1"), 
DAG.Locality.THREAD_LOCAL);
+    validateStreamLocality(dag, componentName("Me", "s1"), null);
+
   }
 
   private void validateSeperateStream(LogicalPlan dag, String streamName, 
String inputOperatorName,
@@ -441,6 +514,18 @@ public class TestModuleExpansion
     validateOperatorParent(dag, componentName("Md", "O1"), "Md");
     validateOperatorParent(dag, componentName("Md", "M1", "O1"), 
componentName("Md", "M1"));
     validateOperatorParent(dag, componentName("Md", "O2"), "Md");
+
+    validateOperatorAttribute(dag, componentName("Ma", "M1", "O1"), 1024);
+    validateOperatorAttribute(dag, componentName("Ma", "M2", "O1"), 2048);
+    validateOperatorAttribute(dag, componentName("Mb", "M1", "O1"), 4096);
+    validateOperatorAttribute(dag, componentName("Mc", "M1", "O1"), 1024);
+    validateOperatorAttribute(dag, componentName("Mc", "M2", "O1"), 2048);
+
+    validatePortAttribute(dag, componentName("Ma", "M1", "O1"), 1);
+    validatePortAttribute(dag, componentName("Ma", "M2", "O1"), 2);
+    validatePortAttribute(dag, componentName("Mb", "M1", "O1"), 3);
+    validatePortAttribute(dag, componentName("Mc", "M1", "O1"), 1);
+    validatePortAttribute(dag, componentName("Mc", "M2", "O1"), 2);
   }
 
   private void validateOperatorParent(LogicalPlan dag, String operatorName, 
String parentModuleName)
@@ -549,4 +634,41 @@ public class TestModuleExpansion
     lpc.prepareDAG(dag, null, "ModuleApp");
     dag.validate();
   }
+
+  /**
+   * Verify attributes populated on DummyOperator from Level1 module
+   */
+  private void validateOperatorAttribute(LogicalPlan dag, String name, int 
memory)
+  {
+    LogicalPlan.OperatorMeta oMeta = dag.getOperatorMeta(name);
+    Attribute.AttributeMap attrs = oMeta.getAttributes();
+    Assert.assertEquals((int)attrs.get(OperatorContext.MEMORY_MB), memory);
+    Assert.assertEquals("Application window id is 2 ", 
(int)attrs.get(OperatorContext.APPLICATION_WINDOW_COUNT), 2);
+    Assert.assertEquals("Locality host is host1", 
attrs.get(OperatorContext.LOCALITY_HOST), "host1");
+    Assert.assertEquals(attrs.get(OperatorContext.PARTITIONER).getClass(), 
TestPartitioner.class);
+    Assert.assertEquals("Checkpoint window count ", 
(int)attrs.get(OperatorContext.CHECKPOINT_WINDOW_COUNT), 120);
+    Assert.assertEquals("Operator is stateless ", 
attrs.get(OperatorContext.STATELESS), true);
+    Assert.assertEquals("SPIN MILLIS is set to 20 ", 
(int)attrs.get(OperatorContext.SPIN_MILLIS), 20);
+
+  }
+
+  /**
+   * Validate attribute set on the port of DummyOperator in Level1Module
+   */
+  private void validatePortAttribute(LogicalPlan dag, String name, int memory)
+  {
+    LogicalPlan.InputPortMeta imeta = 
dag.getOperatorMeta(name).getInputStreams().keySet().iterator().next();
+    Assert.assertEquals(memory, 
(int)imeta.getAttributes().get(Context.PortContext.BUFFER_MEMORY_MB));
+  }
+
+  /**
+   * validate if stream attributes are copied or not
+   */
+  private void validateStreamLocality(LogicalPlan dag, String name, 
DAG.Locality locality)
+  {
+    LogicalPlan.StreamMeta meta = dag.getStream(name);
+    Assert.assertTrue("Metadata for stream is available ", meta != null);
+    Assert.assertEquals("Locality is " + locality, meta.getLocality(), 
locality);
+  }
+
 }

Reply via email to