Repository: apex-core Updated Branches: refs/heads/master 9e89de445 -> 558942aa5
APEXCORE-470 - Added the new api setOperatorAttribute and updated the tests, also marked setAttribute(Operator... as deprecated Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/558942aa Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/558942aa Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/558942aa Branch: refs/heads/master Commit: 558942aa584aad9d9de72fbae466e8a2039e79c2 Parents: 9e89de4 Author: sandeshh <[email protected]> Authored: Tue Jun 7 16:27:26 2016 -0700 Committer: sandeshh <[email protected]> Committed: Tue Jun 14 16:17:23 2016 -0700 ---------------------------------------------------------------------- api/src/main/java/com/datatorrent/api/DAG.java | 13 ++- .../stram/plan/logical/LogicalPlan.java | 6 ++ .../datatorrent/stram/AffinityRulesTest.java | 6 +- .../com/datatorrent/stram/CheckpointTest.java | 6 +- .../com/datatorrent/stram/HostLocalTest.java | 24 ++--- .../datatorrent/stram/OutputUnifiedTest.java | 8 +- .../com/datatorrent/stram/PartitioningTest.java | 6 +- .../com/datatorrent/stram/StreamCodecTest.java | 32 +++--- .../stram/StreamingContainerManagerTest.java | 8 +- .../stram/engine/AutoMetricTest.java | 10 +- .../datatorrent/stram/engine/SliderTest.java | 4 +- .../com/datatorrent/stram/engine/StatsTest.java | 6 +- .../stram/plan/StreamPersistanceTests.java | 4 +- .../stram/plan/logical/DelayOperatorTest.java | 2 +- .../stram/plan/logical/LogicalPlanTest.java | 34 +++---- .../stram/plan/physical/PhysicalPlanTest.java | 102 +++++++++---------- .../datatorrent/stram/stream/OiOStreamTest.java | 10 +- 17 files changed, 149 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/api/src/main/java/com/datatorrent/api/DAG.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java index 1518fcf..b80bc93 100644 --- a/api/src/main/java/com/datatorrent/api/DAG.java +++ b/api/src/main/java/com/datatorrent/api/DAG.java @@ -236,11 +236,22 @@ public interface DAG extends DAGContext, Serializable public abstract <T> void setAttribute(Attribute<T> key, T value); /** - * <p>setAttribute.</p> + * @Deprecated + * Use {@link #setOperatorAttribute} instead */ + @Deprecated public abstract <T> void setAttribute(Operator operator, Attribute<T> key, T value); /** + * Set an attribute for an operator. + * @param <T> Value type of the attribute. + * @param operator The Operator for which the attribute is being set. + * @param key The attribute which needs to be tuned. + * @param value The new value of the attribute. + */ + public abstract <T> void setOperatorAttribute(Operator operator, Attribute<T> key, T value); + + /** * <p>setOutputPortAttribute.</p> */ public abstract <T> void setOutputPortAttribute(Operator.OutputPort<?> port, Attribute<T> key, T value); http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index af6b1bc..b301f9e 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -1476,6 +1476,12 @@ public class LogicalPlan implements Serializable, DAG @Override public <T> void setAttribute(Operator operator, Attribute<T> key, T value) { + setOperatorAttribute(operator, key, value); + } + + @Override + public <T> void setOperatorAttribute(Operator operator, Attribute<T> key, T value) + { this.getMeta(operator).attributes.put(key, value); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/AffinityRulesTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/AffinityRulesTest.java b/engine/src/test/java/com/datatorrent/stram/AffinityRulesTest.java index 88bf133..23e3a97 100644 --- a/engine/src/test/java/com/datatorrent/stram/AffinityRulesTest.java +++ b/engine/src/test/java/com/datatorrent/stram/AffinityRulesTest.java @@ -70,7 +70,7 @@ public class AffinityRulesTest dag.addStream("stream1", o1.outport, o2.inport1); dag.addStream("stream2", o2.outport1, o3.inport1); - dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(5)); + dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(5)); AffinityRulesSet ruleSet = new AffinityRulesSet(); // Valid case: @@ -137,10 +137,10 @@ public class AffinityRulesTest dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); GenericTestOperator o1 = dag.addOperator("O1", GenericTestOperator.class); - dag.setAttribute(o1, OperatorContext.MEMORY_MB, 256); + dag.setOperatorAttribute(o1, OperatorContext.MEMORY_MB, 256); GenericTestOperator o2 = dag.addOperator("O2", GenericTestOperator.class); - dag.setAttribute(o2, OperatorContext.MEMORY_MB, 256); + dag.setOperatorAttribute(o2, OperatorContext.MEMORY_MB, 256); dag.getMeta(o1).getAttributes().put(OperatorContext.LOCALITY_HOST, "host1"); AffinityRulesSet ruleSet = new AffinityRulesSet(); http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java index 5675b53..db939cd 100644 --- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java +++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java @@ -139,7 +139,7 @@ public class CheckpointTest MockInputOperator o1 = dag.addOperator("o1", new MockInputOperator()); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); - dag.setAttribute(o2, OperatorContext.STATELESS, true); + dag.setOperatorAttribute(o2, OperatorContext.STATELESS, true); dag.addStream("o1.outport", o1.outport, o2.inport1).setLocality(Locality.CONTAINER_LOCAL); @@ -433,7 +433,7 @@ public class CheckpointTest dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); - dag.setAttribute(o1, OperatorContext.TIMEOUT_WINDOW_COUNT, 2); + dag.setOperatorAttribute(o1, OperatorContext.TIMEOUT_WINDOW_COUNT, 2); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); @@ -495,7 +495,7 @@ public class CheckpointTest MockInputOperator o1 = dag.addOperator("o1", new MockInputOperator()); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); - dag.setAttribute(o2, OperatorContext.STATELESS, true); + dag.setOperatorAttribute(o2, OperatorContext.STATELESS, true); dag.addStream("o1.outport", o1.outport, o2.inport1); http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/HostLocalTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/HostLocalTest.java b/engine/src/test/java/com/datatorrent/stram/HostLocalTest.java index a2c58cb..b271949 100644 --- a/engine/src/test/java/com/datatorrent/stram/HostLocalTest.java +++ b/engine/src/test/java/com/datatorrent/stram/HostLocalTest.java @@ -114,10 +114,10 @@ public class HostLocalTest GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); - dag.setAttribute(o1,OperatorContext.MEMORY_MB,256); + dag.setOperatorAttribute(o1,OperatorContext.MEMORY_MB,256); GenericTestOperator partitioned = dag.addOperator("partitioned", GenericTestOperator.class); - dag.setAttribute(partitioned,OperatorContext.MEMORY_MB,256); + dag.setOperatorAttribute(partitioned,OperatorContext.MEMORY_MB,256); dag.getMeta(partitioned).getAttributes().put(OperatorContext.LOCALITY_HOST, "host1"); dag.addStream("o1_outport1", o1.outport1, partitioned.inport1).setLocality(Locality.NODE_LOCAL); @@ -158,8 +158,8 @@ public class HostLocalTest GenericTestOperator partitioned = dag.addOperator("partitioned", GenericTestOperator.class); dag.addStream("o1_outport1", o1.outport1, partitioned.inport1).setLocality(Locality.THREAD_LOCAL); - dag.setAttribute(o1,OperatorContext.MEMORY_MB,256); - dag.setAttribute(partitioned,OperatorContext.MEMORY_MB,256); + dag.setOperatorAttribute(o1,OperatorContext.MEMORY_MB,256); + dag.setOperatorAttribute(partitioned,OperatorContext.MEMORY_MB,256); StreamingContainerManager scm = new StreamingContainerManager(dag); @@ -196,8 +196,8 @@ public class HostLocalTest GenericTestOperator partitioned = dag.addOperator("partitioned", GenericTestOperator.class); dag.addStream("o1_outport1", o1.outport1, partitioned.inport1).setLocality(Locality.CONTAINER_LOCAL); - dag.setAttribute(o1, OperatorContext.MEMORY_MB, 256); - dag.setAttribute(partitioned,OperatorContext.MEMORY_MB,256); + dag.setOperatorAttribute(o1, OperatorContext.MEMORY_MB, 256); + dag.setOperatorAttribute(partitioned,OperatorContext.MEMORY_MB,256); StreamingContainerManager scm = new StreamingContainerManager(dag); @@ -232,9 +232,9 @@ public class HostLocalTest GenericTestOperator partitioned = dag.addOperator("partitioned", GenericTestOperator.class); dag.addStream("o1_outport1", o1.outport1, partitioned.inport1).setLocality(Locality.CONTAINER_LOCAL); - dag.setAttribute(o1,OperatorContext.MEMORY_MB,256); - dag.setAttribute(o1,OperatorContext.VCORES,1); - dag.setAttribute(partitioned,OperatorContext.VCORES,1); + dag.setOperatorAttribute(o1,OperatorContext.MEMORY_MB,256); + dag.setOperatorAttribute(o1,OperatorContext.VCORES,1); + dag.setOperatorAttribute(partitioned,OperatorContext.VCORES,1); StreamingContainerManager scm = new StreamingContainerManager(dag); @@ -272,9 +272,9 @@ public class HostLocalTest GenericTestOperator partitioned = dag.addOperator("partitioned", GenericTestOperator.class); dag.addStream("o1_outport1", o1.outport1, partitioned.inport1).setLocality(Locality.CONTAINER_LOCAL); - dag.setAttribute(o1, OperatorContext.MEMORY_MB, 256); - dag.setAttribute(o1, OperatorContext.VCORES, 2); - dag.setAttribute(partitioned, OperatorContext.VCORES, 1); + dag.setOperatorAttribute(o1, OperatorContext.MEMORY_MB, 256); + dag.setOperatorAttribute(o1, OperatorContext.VCORES, 2); + dag.setOperatorAttribute(partitioned, OperatorContext.VCORES, 1); StreamingContainerManager scm = new StreamingContainerManager(dag); http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java b/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java index 470a666..e83c7e4 100644 --- a/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java +++ b/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java @@ -68,7 +68,7 @@ public class OutputUnifiedTest GenericTestOperator op1 = new GenericTestOperator(); dag.addOperator("op1", op1); - dag.setAttribute(op1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); + dag.setOperatorAttribute(op1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); TestOutputOperator op2 = new TestOutputOperator(); dag.addOperator("op2", op2); @@ -97,12 +97,12 @@ public class OutputUnifiedTest GenericTestOperator op1 = new GenericTestOperator(); dag.addOperator("op1", op1); - dag.setAttribute(op1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); + dag.setOperatorAttribute(op1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); TestOutputOperator op2 = new TestOutputOperator(); dag.addOperator("op2", op2); - dag.setAttribute(op2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); + dag.setOperatorAttribute(op2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); dag.addStream("s1", i1.output, op1.inport1); dag.addStream("s2", op1.outport1, op2.inport); @@ -125,7 +125,7 @@ public class OutputUnifiedTest TestInputOperator i1 = new TestInputOperator(); dag.addOperator("i1", i1); - dag.setAttribute(i1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); + dag.setOperatorAttribute(i1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); GenericTestOperator op1 = new GenericTestOperator(); dag.addOperator("op1", op1); http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java index 26575a4..4f8becd 100644 --- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java +++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java @@ -281,8 +281,8 @@ public class PartitioningTest CollectorOperator collector = dag.addOperator("partitionedCollector", new CollectorOperator()); collector.prefix = "" + System.identityHashCode(collector); - dag.setAttribute(collector, OperatorContext.PARTITIONER, new StatelessPartitioner<CollectorOperator>(2)); - dag.setAttribute(collector, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitionLoadWatch()})); + dag.setOperatorAttribute(collector, OperatorContext.PARTITIONER, new StatelessPartitioner<CollectorOperator>(2)); + dag.setOperatorAttribute(collector, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitionLoadWatch()})); dag.addStream("fromInput", input.output, collector.input); CollectorOperator singleCollector = dag.addOperator("singleCollector", new CollectorOperator()); @@ -424,7 +424,7 @@ public class PartitioningTest dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath(), null)); PartitionableInputOperator input = dag.addOperator("input", new PartitionableInputOperator()); - dag.setAttribute(input, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitionLoadWatch()})); + dag.setOperatorAttribute(input, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitionLoadWatch()})); StramLocalCluster lc = new StramLocalCluster(dag); lc.setHeartbeatMonitoringEnabled(false); http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java index d9a9ee4..3d3f7b0 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java @@ -226,7 +226,7 @@ public class StreamCodecTest { GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); - dag.setAttribute(node2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); + dag.setOperatorAttribute(node2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); TestStreamCodec serDe = new TestStreamCodec(); dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe); @@ -273,9 +273,9 @@ public class StreamCodecTest public void testMxNPartitioningStreamCodec() { GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); - dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); + dag.setOperatorAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); - dag.setAttribute(node2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); + dag.setOperatorAttribute(node2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); TestStreamCodec serDe = new TestStreamCodec(); dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe); @@ -337,7 +337,7 @@ public class StreamCodecTest public void testParallelPartitioningStreamCodec() { GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); - dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); + dag.setOperatorAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); dag.setInputPortAttribute(node2.inport1, Context.PortContext.PARTITION_PARALLEL, true); TestStreamCodec serDe = new TestStreamCodec(); @@ -478,7 +478,7 @@ public class StreamCodecTest { GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); - dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); + dag.setOperatorAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); TestStreamCodec serDe = new TestStreamCodec(); dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe); GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class); @@ -609,7 +609,7 @@ public class StreamCodecTest GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class); - dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); + dag.setOperatorAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); TestStreamCodec serDe = new TestStreamCodec(); dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe); TestStreamCodec2 serDe2 = new TestStreamCodec2(); @@ -690,13 +690,13 @@ public class StreamCodecTest public void testMxNMultipleStreamCodecs() { GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); - dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); + dag.setOperatorAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); - dag.setAttribute(node2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); + dag.setOperatorAttribute(node2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); TestStreamCodec serDe = new TestStreamCodec(); dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe); GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class); - dag.setAttribute(node3, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); + dag.setOperatorAttribute(node3, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); TestStreamCodec serDe2 = new TestStreamCodec(); dag.setInputPortAttribute(node3.inport1, Context.PortContext.STREAM_CODEC, serDe2); @@ -873,7 +873,7 @@ public class StreamCodecTest GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class); - dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); + dag.setOperatorAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); dag.setOutputPortAttribute(node1.outport1, Context.PortContext.UNIFIER_LIMIT, 2); TestStreamCodec serDe = new TestStreamCodec(); dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe); @@ -955,16 +955,16 @@ public class StreamCodecTest public void testDynamicPartitioningStreamCodec() { GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); - dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); - dag.setAttribute(node1, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch())); + dag.setOperatorAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); + dag.setOperatorAttribute(node1, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch())); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); - dag.setAttribute(node2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); - dag.setAttribute(node2, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); + dag.setOperatorAttribute(node2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); + dag.setOperatorAttribute(node2, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); TestStreamCodec serDe = new TestStreamCodec(); dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, serDe); GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class); - dag.setAttribute(node3, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); - dag.setAttribute(node3, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); + dag.setOperatorAttribute(node3, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); + dag.setOperatorAttribute(node3, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); TestStreamCodec serDe2 = new TestStreamCodec(); dag.setInputPortAttribute(node3.inport1, Context.PortContext.STREAM_CODEC, serDe2); http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index 385e1ae..11b938c 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -273,7 +273,7 @@ public class StreamingContainerManagerTest GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); PhysicalPlanTest.PartitioningTestOperator node2 = dag.addOperator("node2", PhysicalPlanTest.PartitioningTestOperator.class); node2.setPartitionCount(3); - dag.setAttribute(node2, OperatorContext.SPIN_MILLIS, 10); /* this should not affect anything materially */ + dag.setOperatorAttribute(node2, OperatorContext.SPIN_MILLIS, 10); /* this should not affect anything materially */ dag.setOutputPortAttribute(node2.outport1, PortContext.QUEUE_CAPACITY, 1111); GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class); dag.setInputPortAttribute(node3.inport1, PortContext.QUEUE_CAPACITY, 2222); @@ -515,7 +515,7 @@ public class StreamingContainerManagerTest public void testProcessHeartbeat() throws Exception { TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); - dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); + dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); StreamingContainerManager scm = new StreamingContainerManager(dag); @@ -722,7 +722,7 @@ public class StreamingContainerManagerTest dag.addStream("stream1", o1.outport1, o2.inport1); dag.addStream("stream2", o2.outport1, o3.inport1); - dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); + dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); StreamingContainerManager scm = new StreamingContainerManager(dag); PhysicalPlan physicalPlan = scm.getPhysicalPlan(); @@ -810,7 +810,7 @@ public class StreamingContainerManagerTest { TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); - dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); + dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); dag.addStream("o1Output1", o1.outport, o2.inport1).setLocality(locality); int maxContainers = 5; http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java index 89e9c2f..4b72360 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java @@ -204,7 +204,7 @@ public class AutoMetricTest TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class); TestStatsListener sl = new TestStatsListener(); - dag.setAttribute(testOper, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl)); + dag.setOperatorAttribute(testOper, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl)); GenericTestOperator collector = dag.addOperator("Collector", new GenericTestOperator()); dag.addStream("TestTuples", testOper.outport, collector.inport1).setLocality(Locality.CONTAINER_LOCAL); @@ -249,7 +249,7 @@ public class AutoMetricTest OperatorWithMetrics o1 = dag.addOperator("o1", OperatorWithMetrics.class); MockAggregator aggregator = new MockAggregator(latch); - dag.setAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, aggregator); + dag.setOperatorAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, aggregator); dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent()); @@ -277,10 +277,10 @@ public class AutoMetricTest OperatorWithMetrics o1 = dag.addOperator("o1", OperatorWithMetrics.class); MockAggregator aggregator = new MockAggregator(latch); - dag.setAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, aggregator); + dag.setOperatorAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, aggregator); dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent()); - dag.setAttribute(o1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(2)); + dag.setOperatorAttribute(o1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(2)); dag.addStream("TestTuples", inputOperator.outport, o1.inport1); @@ -428,7 +428,7 @@ public class AutoMetricTest OperatorWithMetricMethod o1 = dag.addOperator("o1", OperatorWithMetricMethod.class); MockAggregator aggregator = new MockAggregator(latch); - dag.setAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, aggregator); + dag.setOperatorAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, aggregator); dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent()); http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java index c3e18b1..8ba57be 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java @@ -147,8 +147,8 @@ public class SliderTest dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 100); Input input = dag.addOperator("Input", new Input()); Sum sum = dag.addOperator("Sum", new Sum()); - dag.setAttribute(sum, OperatorContext.APPLICATION_WINDOW_COUNT, applicationWindowCount); - dag.setAttribute(sum, OperatorContext.SLIDE_BY_WINDOW_COUNT, slideByWindowCount); + dag.setOperatorAttribute(sum, OperatorContext.APPLICATION_WINDOW_COUNT, applicationWindowCount); + dag.setOperatorAttribute(sum, OperatorContext.SLIDE_BY_WINDOW_COUNT, slideByWindowCount); Validator validate = dag.addOperator("validator", new Validator()); Validator.numbersValidated = 0; validate.numberOfIntegers = applicationWindowCount; http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java index 3968e4a..4625368 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java @@ -182,13 +182,13 @@ public class StatsTest dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null)); TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class); TestInputStatsListener testInputStatsListener = new TestInputStatsListener(); - dag.setAttribute(testOper, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{testInputStatsListener})); + dag.setOperatorAttribute(testOper, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{testInputStatsListener})); testOper.setMaxTuples(tupleCount); testOper.setEmitInterval(0); TestCollector collector = dag.addOperator("Collector", new TestCollector()); TestCollectorStatsListener testCollectorStatsListener = new TestCollectorStatsListener(); - dag.setAttribute(collector, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{testCollectorStatsListener})); + dag.setOperatorAttribute(collector, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{testCollectorStatsListener})); dag.addStream("TestTuples", testOper.outport, collector.inport1).setLocality(null); StramLocalCluster lc = new StramLocalCluster(dag); @@ -241,7 +241,7 @@ public class StatsTest TestCollector collector = dag.addOperator("Collector", new TestCollector()); if (statsListener != null) { - dag.setAttribute(collector, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{statsListener})); + dag.setOperatorAttribute(collector, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{statsListener})); } dag.addStream("TestTuples", testOper.outport, collector.inport1).setLocality(locality); http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java index 299172f..2457786 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java @@ -950,8 +950,8 @@ public class StreamPersistanceTests AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator()); final TestReceiverOperator console = dag.addOperator("console", new TestReceiverOperator()); - dag.setAttribute(console, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<TestReceiverOperator>(2)); - dag.setAttribute(console, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch())); + dag.setOperatorAttribute(console, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<TestReceiverOperator>(2)); + dag.setOperatorAttribute(console, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch())); final PartitionedTestPersistanceOperator console1 = new PartitionedTestPersistanceOperator(); http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java index 75b20fe..821f4ea 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java @@ -122,7 +122,7 @@ public class DelayOperatorTest opC = dag.addOperator("C", GenericTestOperator.class); opD = dag.addOperator("D", GenericTestOperator.class); opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class); - dag.setAttribute(opDelay, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 2); + dag.setOperatorAttribute(opDelay, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 2); dag.addStream("BtoC", opB.outport1, opC.inport1); dag.addStream("CtoD", opC.outport1, opD.inport1); dag.addStream("CtoDelay", opC.outport2, opDelay.input); http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java index 9a4e0e8..d4c68aa 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java @@ -474,7 +474,7 @@ public class LogicalPlanTest dag.addStream("Connection", input.outport, operator.input1); - dag.setAttribute(operator, OperatorContext.PARTITIONER, new StatelessPartitioner<TestOperatorAnnotationOperator>(2)); + dag.setOperatorAttribute(operator, OperatorContext.PARTITIONER, new StatelessPartitioner<TestOperatorAnnotationOperator>(2)); try { dag.validate(); @@ -483,7 +483,7 @@ public class LogicalPlanTest Assert.assertEquals("", "Operator " + dag.getMeta(operator).getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage()); } - dag.setAttribute(operator, OperatorContext.PARTITIONER, null); + dag.setOperatorAttribute(operator, OperatorContext.PARTITIONER, null); dag.setInputPortAttribute(operator.input1, PortContext.PARTITION_PARALLEL, true); try { @@ -545,13 +545,13 @@ public class LogicalPlanTest TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class); GenericTestOperator amoOper = dag.addOperator("amoOper", GenericTestOperator.class); - dag.setAttribute(amoOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_MOST_ONCE); + dag.setOperatorAttribute(amoOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_MOST_ONCE); dag.addStream("input1.outport", input1.outport, amoOper.inport1); dag.addStream("input2.outport", input2.outport, amoOper.inport2); GenericTestOperator outputOper = dag.addOperator("outputOper", GenericTestOperator.class); - dag.setAttribute(outputOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_LEAST_ONCE); + dag.setOperatorAttribute(outputOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_LEAST_ONCE); dag.addStream("aloOper.outport1", amoOper.outport1, outputOper.inport1); try { @@ -560,7 +560,7 @@ public class LogicalPlanTest } catch (ValidationException ve) { Assert.assertEquals("", ve.getMessage(), "Processing mode outputOper/AT_LEAST_ONCE not valid for source amoOper/AT_MOST_ONCE"); } - dag.setAttribute(outputOper, OperatorContext.PROCESSING_MODE, null); + dag.setOperatorAttribute(outputOper, OperatorContext.PROCESSING_MODE, null); dag.validate(); OperatorMeta outputOperOm = dag.getMeta(outputOper); @@ -577,7 +577,7 @@ public class LogicalPlanTest TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class); GenericTestOperator amoOper = dag.addOperator("amoOper", GenericTestOperator.class); - dag.setAttribute(amoOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.EXACTLY_ONCE); + dag.setOperatorAttribute(amoOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.EXACTLY_ONCE); dag.addStream("input1.outport", input1.outport, amoOper.inport1); dag.addStream("input2.outport", input2.outport, amoOper.inport2); @@ -592,7 +592,7 @@ public class LogicalPlanTest Assert.assertEquals("", ve.getMessage(), "Processing mode for outputOper should be AT_MOST_ONCE for source amoOper/EXACTLY_ONCE"); } - dag.setAttribute(outputOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_LEAST_ONCE); + dag.setOperatorAttribute(outputOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_LEAST_ONCE); try { dag.validate(); @@ -602,7 +602,7 @@ public class LogicalPlanTest } // AT_MOST_ONCE is valid - dag.setAttribute(outputOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_MOST_ONCE); + dag.setOperatorAttribute(outputOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_MOST_ONCE); dag.validate(); } @@ -803,7 +803,7 @@ public class LogicalPlanTest // Operator attribute not serializable test dag = new LogicalPlan(); TestGeneratorInputOperator operator = dag.addOperator("TestOperator", TestGeneratorInputOperator.class); - dag.setAttribute(operator, attr, new TestAttributeValue()); + dag.setOperatorAttribute(operator, attr, new TestAttributeValue()); try { dag.validate(); Assert.fail("Setting not serializable attribute should throw exception"); @@ -961,22 +961,22 @@ public class LogicalPlanTest TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); GenericTestOperator x = dag.addOperator("x", new GenericTestOperator()); dag.addStream("Stream1", input1.outport, x.inport1); - dag.setAttribute(x, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15); - dag.setAttribute(x, OperatorContext.APPLICATION_WINDOW_COUNT, 30); + dag.setOperatorAttribute(x, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15); + dag.setOperatorAttribute(x, OperatorContext.APPLICATION_WINDOW_COUNT, 30); dag.validate(); TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class); CheckpointableWithinAppWindowOperator y = dag.addOperator("y", new CheckpointableWithinAppWindowOperator()); dag.addStream("Stream2", input2.outport, y.inport1); - dag.setAttribute(y, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15); - dag.setAttribute(y, OperatorContext.APPLICATION_WINDOW_COUNT, 30); + dag.setOperatorAttribute(y, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15); + dag.setOperatorAttribute(y, OperatorContext.APPLICATION_WINDOW_COUNT, 30); dag.validate(); TestGeneratorInputOperator input3 = dag.addOperator("input3", TestGeneratorInputOperator.class); NotCheckpointableWithinAppWindowOperator z = dag.addOperator("z", new NotCheckpointableWithinAppWindowOperator()); dag.addStream("Stream3", input3.outport, z.inport1); - dag.setAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15); - dag.setAttribute(z, OperatorContext.APPLICATION_WINDOW_COUNT, 30); + dag.setOperatorAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15); + dag.setOperatorAttribute(z, OperatorContext.APPLICATION_WINDOW_COUNT, 30); try { dag.validate(); Assert.fail("should fail because chekpoint window count is not a factor of application window count"); @@ -984,10 +984,10 @@ public class LogicalPlanTest // expected } - dag.setAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 30); + dag.setOperatorAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 30); dag.validate(); - dag.setAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 45); + dag.setOperatorAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 45); try { dag.validate(); Assert.fail("should fail because chekpoint window count is not a factor of application window count"); http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java index f10905a..99dee31 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java @@ -320,7 +320,7 @@ public class PhysicalPlanTest GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); dag.addStream("node1.outport1", node1.outport1, node2.inport1); - dag.setAttribute(node1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(5)); + dag.setOperatorAttribute(node1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(5)); dag.setOutputPortAttribute(node1.outport1, PortContext.UNIFIER_LIMIT, 3); PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext()); List<PTContainer> containers = plan.getContainers(); @@ -347,7 +347,7 @@ public class PhysicalPlanTest GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); dag.addStream("node1.outport1", node1.outport1, node2.inport1); - dag.setAttribute(node1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(8)); + dag.setOperatorAttribute(node1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(8)); dag.setOutputPortAttribute(node1.outport1, PortContext.UNIFIER_LIMIT, 4); PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext()); List<PTContainer> containers = plan.getContainers(); @@ -473,8 +473,8 @@ public class PhysicalPlanTest LogicalPlan dag = new LogicalPlan(); TestInputOperator<Object> o1 = dag.addOperator("o1", new TestInputOperator<>()); OperatorMeta o1Meta = dag.getMeta(o1); - dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); - dag.setAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<TestInputOperator<Object>>(2)); + dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); + dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<TestInputOperator<Object>>(2)); TestPlanContext ctx = new TestPlanContext(); dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx); @@ -649,8 +649,8 @@ public class PhysicalPlanTest dag.addStream("o1.outport1", o1.output, o2.inport1); OperatorMeta o1Meta = dag.getMeta(o1); - dag.setAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); - dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); + dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); + dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); TestPlanContext ctx = new TestPlanContext(); dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx); @@ -979,7 +979,7 @@ public class PhysicalPlanTest GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); - dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); + dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); @@ -1129,13 +1129,13 @@ public class PhysicalPlanTest LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); - dag.setAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(2)); - dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch())); + dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(2)); + dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch())); OperatorMeta o1Meta = dag.getMeta(o1); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); - dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(3)); - dag.setAttribute(o2, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); + dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(3)); + dag.setOperatorAttribute(o2, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); OperatorMeta o2Meta = dag.getMeta(o2); dag.addStream("o1.outport1", o1.outport, o2.inport1); @@ -1331,14 +1331,14 @@ public class PhysicalPlanTest LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); - dag.setAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(2)); - dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch())); + dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(2)); + dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch())); dag.setOutputPortAttribute(o1.outport, PortContext.UNIFIER_SINGLE_FINAL, true); OperatorMeta o1Meta = dag.getMeta(o1); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); - dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(3)); - dag.setAttribute(o2, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); + dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(3)); + dag.setOperatorAttribute(o2, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); OperatorMeta o2Meta = dag.getMeta(o2); dag.addStream("o1.outport1", o1.outport, o2.inport1); @@ -1524,8 +1524,8 @@ public class PhysicalPlanTest LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); - dag.setAttribute(o1, OperatorContext.PARTITIONER, new TestAugmentingPartitioner<TestGeneratorInputOperator>(3)); - dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch())); + dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new TestAugmentingPartitioner<TestGeneratorInputOperator>(3)); + dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch())); OperatorMeta o1Meta = dag.getMeta(o1); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); @@ -1617,13 +1617,13 @@ public class PhysicalPlanTest o1.partitionKeys = new Integer[] {0,1,2,3}; o1.setPartitionCount(o1.partitionKeys.length); - dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); + dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); dag.setOutputPortAttribute(o1.outport1, PortContext.UNIFIER_LIMIT, 2); OperatorMeta o1Meta = dag.getMeta(o1); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); - dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); + dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); OperatorMeta o2Meta = dag.getMeta(o2); dag.addStream("o1.outport1", o1.outport1, o2.inport1); @@ -1719,13 +1719,13 @@ public class PhysicalPlanTest o1.partitionKeys = new Integer[]{0, 1, 2, 3}; o1.setPartitionCount(3); - dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); + dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); dag.setOutputPortAttribute(o1.outport1, PortContext.UNIFIER_LIMIT, 2); dag.setOutputPortAttribute(o1.outport1, PortContext.UNIFIER_SINGLE_FINAL, true); OperatorMeta o1Meta = dag.getMeta(o1); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); - dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); + dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); OperatorMeta o2Meta = dag.getMeta(o2); dag.addStream("o1.outport1", o1.outport1, o2.inport1); @@ -1826,11 +1826,11 @@ public class PhysicalPlanTest LogicalPlan dag = new LogicalPlan(); GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); - dag.setAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); + dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); OperatorMeta o1Meta = dag.getMeta(o1); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); - dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); + dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); dag.setInputPortAttribute(o2.inport1, PortContext.UNIFIER_SINGLE_FINAL, true); OperatorMeta o2Meta = dag.getMeta(o2); @@ -1880,16 +1880,16 @@ public class PhysicalPlanTest LogicalPlan dag = new LogicalPlan(); GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); - dag.setAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); + dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); OperatorMeta o1Meta = dag.getMeta(o1); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); - dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(4)); + dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(4)); dag.setInputPortAttribute(o2.inport1, PortContext.UNIFIER_SINGLE_FINAL, true); OperatorMeta o2Meta = dag.getMeta(o2); GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); - dag.setAttribute(o3, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); + dag.setOperatorAttribute(o3, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); OperatorMeta o3Meta = dag.getMeta(o3); dag.addStream("o1o2o3", o1.outport1, o2.inport1, o3.inport1); @@ -1945,13 +1945,13 @@ public class PhysicalPlanTest GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); - dag.setAttribute(o1,OperatorContext.VCORES,1); - dag.setAttribute(o2,OperatorContext.VCORES,2); + dag.setOperatorAttribute(o1,OperatorContext.VCORES,1); + dag.setOperatorAttribute(o2,OperatorContext.VCORES,2); dag.addStream("o1.outport1", o1.outport1, o2.inport1); dag.addStream("o2.outport1", o2.outport1, o3.inport1); - dag.setAttribute(o2, OperatorContext.MEMORY_MB, 4000); + dag.setOperatorAttribute(o2, OperatorContext.MEMORY_MB, 4000); dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 2); PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext()); @@ -1976,12 +1976,12 @@ public class PhysicalPlanTest GenericTestOperator o4 = dag.addOperator("o4", GenericTestOperator.class); GenericTestOperator o5 = dag.addOperator("o5", GenericTestOperator.class); GenericTestOperator o6 = dag.addOperator("o6", GenericTestOperator.class); - dag.setAttribute(o1,OperatorContext.VCORES,1); - dag.setAttribute(o2,OperatorContext.VCORES,2); - dag.setAttribute(o3,OperatorContext.VCORES,3); - dag.setAttribute(o4,OperatorContext.VCORES,4); - dag.setAttribute(o5,OperatorContext.VCORES,5); - dag.setAttribute(o6,OperatorContext.VCORES,6); + dag.setOperatorAttribute(o1,OperatorContext.VCORES,1); + dag.setOperatorAttribute(o2,OperatorContext.VCORES,2); + dag.setOperatorAttribute(o3,OperatorContext.VCORES,3); + dag.setOperatorAttribute(o4,OperatorContext.VCORES,4); + dag.setOperatorAttribute(o5,OperatorContext.VCORES,5); + dag.setOperatorAttribute(o6,OperatorContext.VCORES,6); dag.addStream("o1.outport1", o1.outport1, o2.inport1).setLocality(Locality.CONTAINER_LOCAL); dag.addStream("o2.outport1", o2.outport1, o3.inport1, o4.inport1).setLocality(Locality.THREAD_LOCAL); @@ -2005,8 +2005,8 @@ public class PhysicalPlanTest GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); - dag.setAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); - dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); + dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); + dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); dag.addStream("o1.outport1", o1.outport1, o2.inport1); dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 10); @@ -2061,11 +2061,11 @@ public class PhysicalPlanTest LogicalPlan dag = new LogicalPlan(); GenericTestOperator nodeX = dag.addOperator("X", GenericTestOperator.class); - dag.setAttribute(nodeX, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); - dag.setAttribute(nodeX, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList(listener)); + dag.setOperatorAttribute(nodeX, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); + dag.setOperatorAttribute(nodeX, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList(listener)); GenericTestOperator nodeY = dag.addOperator("Y", GenericTestOperator.class); - dag.setAttribute(nodeY, Context.OperatorContext.PARTITIONER, new TestPartitioner<GenericTestOperator>()); + dag.setOperatorAttribute(nodeY, Context.OperatorContext.PARTITIONER, new TestPartitioner<GenericTestOperator>()); GenericTestOperator nodeZ = dag.addOperator("Z", GenericTestOperator.class); @@ -2155,8 +2155,8 @@ public class PhysicalPlanTest GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); - dag.setAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4); - dag.setAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2); + dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4); + dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2); dag.getOperatorMeta("o1").getMeta(o1.outport1).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB, 2000); dag.getOperatorMeta("o1").getMeta(o1.outport2).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB, 4000); @@ -2193,13 +2193,13 @@ public class PhysicalPlanTest GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); - dag.setAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4); - dag.setAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2); - dag.setAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2)); + dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4); + dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2); + dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2)); dag.getOperatorMeta("o1").getMeta(o1.outport1).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB, 1024); - dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2)); - dag.setAttribute(o2, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2); - dag.setAttribute(o2, OperatorContext.APPLICATION_WINDOW_COUNT, 4); + dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2)); + dag.setOperatorAttribute(o2, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2); + dag.setOperatorAttribute(o2, OperatorContext.APPLICATION_WINDOW_COUNT, 4); dag.addStream("o1.outport1", o1.outport1, o2.inport1); dag.addStream("o2.outport1", o2.outport1, o3.inport1); @@ -2216,10 +2216,10 @@ public class PhysicalPlanTest GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); - dag.setAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2); - dag.setAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2)); + dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2); + dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2)); dag.setInputPortAttribute(o2.inport1, PortContext.PARTITION_PARALLEL, true); - dag.setAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4); + dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4); dag.addStream("o1.outport1", o1.outport1, o2.inport1); dag.addStream("o2.outport1", o2.outport1, o3.inport1); http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java index acd370d..e614750 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java @@ -196,11 +196,11 @@ public class OiOStreamTest plan.addStream("OiOout1", intermediateOperator2.output, outputOperator.input).setLocality(Locality.THREAD_LOCAL); plan.addStream("OiOout2", intermediateOperator4.output, outputOperator.input2).setLocality(Locality.THREAD_LOCAL); - plan.setAttribute(inputOperator, OperatorContext.VCORES, 1); - plan.setAttribute(intermediateOperator1, OperatorContext.VCORES, 1); - plan.setAttribute(intermediateOperator2, OperatorContext.VCORES, 2); - plan.setAttribute(intermediateOperator3, OperatorContext.VCORES, 3); - plan.setAttribute(intermediateOperator4, OperatorContext.VCORES, 5); + plan.setOperatorAttribute(inputOperator, OperatorContext.VCORES, 1); + plan.setOperatorAttribute(intermediateOperator1, OperatorContext.VCORES, 1); + plan.setOperatorAttribute(intermediateOperator2, OperatorContext.VCORES, 2); + plan.setOperatorAttribute(intermediateOperator3, OperatorContext.VCORES, 3); + plan.setOperatorAttribute(intermediateOperator4, OperatorContext.VCORES, 5); plan.setAttribute(OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent()); try {
