Repository: apex-core
Updated Branches:
  refs/heads/master 32f229f21 -> 7106b766a


APEXCORE-634 Apex Platform unable to set unifier attributes for modules in DAG.

Problem:     Unable to set Unifier attributes of output port within modules
Description: When modules are flatten in logical Plan of DAG, only top level 
attributes are cloned of OperatorMeta.
             Unifier attributes are not copied in PortMapping for output ports.
Solution:    Clone the unifier attributes while flattening DAG in logical Plan.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/7106b766
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/7106b766
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/7106b766

Branch: refs/heads/master
Commit: 7106b766a1f6774c62b56290547958e76bdcfb9a
Parents: 32f229f
Author: deepak-narkhede <[email protected]>
Authored: Tue Feb 7 16:39:07 2017 +0530
Committer: deepak-narkhede <[email protected]>
Committed: Wed Feb 22 14:52:03 2017 +0530

----------------------------------------------------------------------
 .../stram/plan/logical/LogicalPlan.java         |   4 +
 .../logical/LogicalPlanConfigurationTest.java   | 107 +++++++++++++++++++
 .../stram/plan/logical/MockStorageAgent.java    |   3 +-
 3 files changed, 113 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/7106b766/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java 
b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index e1debbb..0a1004c 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -1039,6 +1039,10 @@ public class LogicalPlan implements Serializable, DAG
       // copy Output port attributes
       for (Map.Entry<OutputPort<?>, OutputPortMeta> entry : 
operatorMeta.getPortMapping().outPortMap.entrySet()) {
         
copyAttributes(getPortMapping().outPortMap.get(entry.getKey()).attributes, 
entry.getValue().attributes);
+
+        // copy Unifier attributes
+        
copyAttributes(getPortMapping().outPortMap.get(entry.getKey()).getUnifierMeta().attributes,
+            entry.getValue().getUnifierMeta().attributes);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/7106b766/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
index caa1bf3..18dfd99 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
@@ -53,6 +53,9 @@ import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Module;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.StreamingApplication;
@@ -60,12 +63,15 @@ import com.datatorrent.api.StringCodec;
 import com.datatorrent.api.StringCodec.Integer2String;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.common.codec.JsonStreamCodec;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.common.util.BasicContainerOptConfigurator;
 import com.datatorrent.stram.PartitioningTest.PartitionLoadWatch;
 import com.datatorrent.stram.client.StramClientUtils;
 import com.datatorrent.stram.engine.GenericTestOperator;
 import com.datatorrent.stram.engine.TestGeneratorInputOperator;
 import com.datatorrent.stram.plan.SchemaTestOperator;
+import com.datatorrent.stram.plan.TestPlanContext;
 import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
@@ -75,6 +81,9 @@ import 
com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.ConfElement;
 import 
com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.ContextUtils;
 import 
com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.StramElement;
 import 
com.datatorrent.stram.plan.logical.LogicalPlanTest.ValidationTestOperator;
+import com.datatorrent.stram.plan.physical.PTContainer;
+import com.datatorrent.stram.plan.physical.PTOperator;
+import com.datatorrent.stram.plan.physical.PhysicalPlan;
 import com.datatorrent.stram.support.StramTestSupport.RegexMatcher;
 
 import static org.junit.Assert.assertEquals;
@@ -600,6 +609,104 @@ public class LogicalPlanConfigurationTest
   }
 
   @Test
+  @SuppressWarnings({"UnnecessaryBoxing", 
"AssertEqualsBetweenInconvertibleTypes"})
+  public void testModuleUnifierLevelAttributes()
+  {
+    class DummyOperator extends BaseOperator
+    {
+      int prop;
+
+      public transient DefaultInputPort<Integer> input = new 
DefaultInputPort<Integer>()
+      {
+        @Override
+        public void process(Integer tuple)
+        {
+          LOG.debug(tuple.intValue() + " processed");
+          output.emit(tuple);
+        }
+      };
+      public transient DefaultOutputPort<Integer> output = new 
DefaultOutputPort<Integer>();
+    }
+
+    class DummyOutputOperator extends BaseOperator
+    {
+      int prop;
+
+      public transient DefaultInputPort<Integer> input = new 
DefaultInputPort<Integer>()
+      {
+        @Override
+        public void process(Integer tuple)
+        {
+          LOG.debug(tuple.intValue() + " processed");
+        }
+      };
+    }
+
+    class TestUnifierAttributeModule implements Module
+    {
+      public transient ProxyInputPort<Integer> moduleInput = new 
ProxyInputPort<Integer>();
+      public transient ProxyOutputPort<Integer> moduleOutput = new 
Module.ProxyOutputPort<Integer>();
+
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+        DummyOperator dummyOperator = dag.addOperator("DummyOperator", new 
DummyOperator());
+        dag.setOperatorAttribute(dummyOperator, 
Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<DummyOperator>(3));
+        dag.setUnifierAttribute(dummyOperator.output, 
OperatorContext.TIMEOUT_WINDOW_COUNT, 2);
+        moduleInput.set(dummyOperator.input);
+        moduleOutput.set(dummyOperator.output);
+      }
+    }
+
+    StreamingApplication app = new StreamingApplication()
+    {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+        Module m1 = dag.addModule("TestModule", new 
TestUnifierAttributeModule());
+        DummyOutputOperator dummyOutputOperator = 
dag.addOperator("DummyOutputOperator", new DummyOutputOperator());
+        dag.addStream("Module To Operator", 
((TestUnifierAttributeModule)m1).moduleOutput, dummyOutputOperator.input);
+      }
+    };
+
+    String appName = "UnifierApp";
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new 
Configuration(false));
+    LogicalPlan dag = new LogicalPlan();
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new 
MockStorageAgent());
+    dagBuilder.prepareDAG(dag, app, appName);
+    LogicalPlan.OperatorMeta ometa = 
dag.getOperatorMeta("TestModule$DummyOperator");
+    LogicalPlan.OperatorMeta om = null;
+    for (Map.Entry<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> entry : 
ometa.getOutputStreams().entrySet()) {
+      if (entry.getKey().getPortName().equals("output")) {
+        om = entry.getKey().getUnifierMeta();
+      }
+    }
+
+    /*
+     * Verify the attribute value after preparing DAG.
+     */
+    Assert.assertNotNull(om);
+    Assert.assertEquals("", Integer.valueOf(2), 
om.getValue(Context.OperatorContext.TIMEOUT_WINDOW_COUNT));
+
+    PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
+    List<PTContainer> containers = plan.getContainers();
+    LogicalPlan.OperatorMeta operatorMeta = null;
+    for (PTContainer container : containers) {
+      List<PTOperator> operators = container.getOperators();
+      for (PTOperator operator : operators) {
+        if (operator.isUnifier()) {
+          operatorMeta = operator.getOperatorMeta();
+        }
+      }
+    }
+
+    /*
+     * Verify attribute after physical plan creation with partitioned 
operators.
+     */
+    Assert.assertEquals("", Integer.valueOf(2), 
operatorMeta.getValue(OperatorContext.TIMEOUT_WINDOW_COUNT));
+  }
+
+  @Test
   public void testOperatorLevelProperties()
   {
     String appName = "app1";

http://git-wip-us.apache.org/repos/asf/apex-core/blob/7106b766/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java
index 36b5fb2..a28c375 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java
@@ -31,7 +31,8 @@ public class MockStorageAgent implements StorageAgent
   @Override
   public void save(Object object, int operatorId, long windowId) throws 
IOException
   {
-    throw new UnsupportedOperationException("Not supported yet.");
+    // Do nothing for now
+    return;
   }
 
   @Override

Reply via email to