Repository: apex-core
Updated Branches:
refs/heads/master 32f229f21 -> 7106b766a
APEXCORE-634 Apex Platform unable to set unifier attributes for modules in DAG.
Problem: Unable to set Unifier attributes of output port within modules
Description: When modules are flatten in logical Plan of DAG, only top level
attributes are cloned of OperatorMeta.
Unifier attributes are not copied in PortMapping for output ports.
Solution: Clone the unifier attributes while flattening DAG in logical Plan.
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/7106b766
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/7106b766
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/7106b766
Branch: refs/heads/master
Commit: 7106b766a1f6774c62b56290547958e76bdcfb9a
Parents: 32f229f
Author: deepak-narkhede <[email protected]>
Authored: Tue Feb 7 16:39:07 2017 +0530
Committer: deepak-narkhede <[email protected]>
Committed: Wed Feb 22 14:52:03 2017 +0530
----------------------------------------------------------------------
.../stram/plan/logical/LogicalPlan.java | 4 +
.../logical/LogicalPlanConfigurationTest.java | 107 +++++++++++++++++++
.../stram/plan/logical/MockStorageAgent.java | 3 +-
3 files changed, 113 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7106b766/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git
a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index e1debbb..0a1004c 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -1039,6 +1039,10 @@ public class LogicalPlan implements Serializable, DAG
// copy Output port attributes
for (Map.Entry<OutputPort<?>, OutputPortMeta> entry :
operatorMeta.getPortMapping().outPortMap.entrySet()) {
copyAttributes(getPortMapping().outPortMap.get(entry.getKey()).attributes,
entry.getValue().attributes);
+
+ // copy Unifier attributes
+
copyAttributes(getPortMapping().outPortMap.get(entry.getKey()).getUnifierMeta().attributes,
+ entry.getValue().getUnifierMeta().attributes);
}
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7106b766/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
----------------------------------------------------------------------
diff --git
a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
index caa1bf3..18dfd99 100644
---
a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
+++
b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
@@ -53,6 +53,9 @@ import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Module;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StreamingApplication;
@@ -60,12 +63,15 @@ import com.datatorrent.api.StringCodec;
import com.datatorrent.api.StringCodec.Integer2String;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.common.codec.JsonStreamCodec;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.BasicContainerOptConfigurator;
import com.datatorrent.stram.PartitioningTest.PartitionLoadWatch;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.SchemaTestOperator;
+import com.datatorrent.stram.plan.TestPlanContext;
import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
@@ -75,6 +81,9 @@ import
com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.ConfElement;
import
com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.ContextUtils;
import
com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.StramElement;
import
com.datatorrent.stram.plan.logical.LogicalPlanTest.ValidationTestOperator;
+import com.datatorrent.stram.plan.physical.PTContainer;
+import com.datatorrent.stram.plan.physical.PTOperator;
+import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.support.StramTestSupport.RegexMatcher;
import static org.junit.Assert.assertEquals;
@@ -600,6 +609,104 @@ public class LogicalPlanConfigurationTest
}
@Test
+ @SuppressWarnings({"UnnecessaryBoxing",
"AssertEqualsBetweenInconvertibleTypes"})
+ public void testModuleUnifierLevelAttributes()
+ {
+ class DummyOperator extends BaseOperator
+ {
+ int prop;
+
+ public transient DefaultInputPort<Integer> input = new
DefaultInputPort<Integer>()
+ {
+ @Override
+ public void process(Integer tuple)
+ {
+ LOG.debug(tuple.intValue() + " processed");
+ output.emit(tuple);
+ }
+ };
+ public transient DefaultOutputPort<Integer> output = new
DefaultOutputPort<Integer>();
+ }
+
+ class DummyOutputOperator extends BaseOperator
+ {
+ int prop;
+
+ public transient DefaultInputPort<Integer> input = new
DefaultInputPort<Integer>()
+ {
+ @Override
+ public void process(Integer tuple)
+ {
+ LOG.debug(tuple.intValue() + " processed");
+ }
+ };
+ }
+
+ class TestUnifierAttributeModule implements Module
+ {
+ public transient ProxyInputPort<Integer> moduleInput = new
ProxyInputPort<Integer>();
+ public transient ProxyOutputPort<Integer> moduleOutput = new
Module.ProxyOutputPort<Integer>();
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ DummyOperator dummyOperator = dag.addOperator("DummyOperator", new
DummyOperator());
+ dag.setOperatorAttribute(dummyOperator,
Context.OperatorContext.PARTITIONER, new
StatelessPartitioner<DummyOperator>(3));
+ dag.setUnifierAttribute(dummyOperator.output,
OperatorContext.TIMEOUT_WINDOW_COUNT, 2);
+ moduleInput.set(dummyOperator.input);
+ moduleOutput.set(dummyOperator.output);
+ }
+ }
+
+ StreamingApplication app = new StreamingApplication()
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ Module m1 = dag.addModule("TestModule", new
TestUnifierAttributeModule());
+ DummyOutputOperator dummyOutputOperator =
dag.addOperator("DummyOutputOperator", new DummyOutputOperator());
+ dag.addStream("Module To Operator",
((TestUnifierAttributeModule)m1).moduleOutput, dummyOutputOperator.input);
+ }
+ };
+
+ String appName = "UnifierApp";
+ LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new
Configuration(false));
+ LogicalPlan dag = new LogicalPlan();
+ dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new
MockStorageAgent());
+ dagBuilder.prepareDAG(dag, app, appName);
+ LogicalPlan.OperatorMeta ometa =
dag.getOperatorMeta("TestModule$DummyOperator");
+ LogicalPlan.OperatorMeta om = null;
+ for (Map.Entry<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> entry :
ometa.getOutputStreams().entrySet()) {
+ if (entry.getKey().getPortName().equals("output")) {
+ om = entry.getKey().getUnifierMeta();
+ }
+ }
+
+ /*
+ * Verify the attribute value after preparing DAG.
+ */
+ Assert.assertNotNull(om);
+ Assert.assertEquals("", Integer.valueOf(2),
om.getValue(Context.OperatorContext.TIMEOUT_WINDOW_COUNT));
+
+ PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
+ List<PTContainer> containers = plan.getContainers();
+ LogicalPlan.OperatorMeta operatorMeta = null;
+ for (PTContainer container : containers) {
+ List<PTOperator> operators = container.getOperators();
+ for (PTOperator operator : operators) {
+ if (operator.isUnifier()) {
+ operatorMeta = operator.getOperatorMeta();
+ }
+ }
+ }
+
+ /*
+ * Verify attribute after physical plan creation with partitioned
operators.
+ */
+ Assert.assertEquals("", Integer.valueOf(2),
operatorMeta.getValue(OperatorContext.TIMEOUT_WINDOW_COUNT));
+ }
+
+ @Test
public void testOperatorLevelProperties()
{
String appName = "app1";
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7106b766/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java
----------------------------------------------------------------------
diff --git
a/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java
b/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java
index 36b5fb2..a28c375 100644
---
a/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java
+++
b/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java
@@ -31,7 +31,8 @@ public class MockStorageAgent implements StorageAgent
@Override
public void save(Object object, int operatorId, long windowId) throws
IOException
{
- throw new UnsupportedOperationException("Not supported yet.");
+ // Do nothing for now
+ return;
}
@Override