Repository: apex-core
Updated Branches:
  refs/heads/master 9e89de445 -> 558942aa5


APEXCORE-470 - Added the new api setOperatorAttribute and updated the tests, 
also marked setAttribute(Operator... as deprecated


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/558942aa
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/558942aa
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/558942aa

Branch: refs/heads/master
Commit: 558942aa584aad9d9de72fbae466e8a2039e79c2
Parents: 9e89de4
Author: sandeshh <[email protected]>
Authored: Tue Jun 7 16:27:26 2016 -0700
Committer: sandeshh <[email protected]>
Committed: Tue Jun 14 16:17:23 2016 -0700

----------------------------------------------------------------------
 api/src/main/java/com/datatorrent/api/DAG.java  |  13 ++-
 .../stram/plan/logical/LogicalPlan.java         |   6 ++
 .../datatorrent/stram/AffinityRulesTest.java    |   6 +-
 .../com/datatorrent/stram/CheckpointTest.java   |   6 +-
 .../com/datatorrent/stram/HostLocalTest.java    |  24 ++---
 .../datatorrent/stram/OutputUnifiedTest.java    |   8 +-
 .../com/datatorrent/stram/PartitioningTest.java |   6 +-
 .../com/datatorrent/stram/StreamCodecTest.java  |  32 +++---
 .../stram/StreamingContainerManagerTest.java    |   8 +-
 .../stram/engine/AutoMetricTest.java            |  10 +-
 .../datatorrent/stram/engine/SliderTest.java    |   4 +-
 .../com/datatorrent/stram/engine/StatsTest.java |   6 +-
 .../stram/plan/StreamPersistanceTests.java      |   4 +-
 .../stram/plan/logical/DelayOperatorTest.java   |   2 +-
 .../stram/plan/logical/LogicalPlanTest.java     |  34 +++----
 .../stram/plan/physical/PhysicalPlanTest.java   | 102 +++++++++----------
 .../datatorrent/stram/stream/OiOStreamTest.java |  10 +-
 17 files changed, 149 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/api/src/main/java/com/datatorrent/api/DAG.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DAG.java 
b/api/src/main/java/com/datatorrent/api/DAG.java
index 1518fcf..b80bc93 100644
--- a/api/src/main/java/com/datatorrent/api/DAG.java
+++ b/api/src/main/java/com/datatorrent/api/DAG.java
@@ -236,11 +236,22 @@ public interface DAG extends DAGContext, Serializable
   public abstract <T> void setAttribute(Attribute<T> key, T value);
 
   /**
-   * <p>setAttribute.</p>
+   * @Deprecated
+   * Use {@link #setOperatorAttribute} instead
    */
+  @Deprecated
   public abstract <T> void setAttribute(Operator operator, Attribute<T> key, T 
value);
 
   /**
+   * Set an attribute for an operator.
+   * @param <T> Value type of the attribute.
+   * @param operator The Operator for which the attribute is being set.
+   * @param key The attribute which needs to be tuned.
+   * @param value The new value of the attribute.
+   */
+  public abstract <T> void setOperatorAttribute(Operator operator, 
Attribute<T> key, T value);
+
+  /**
    * <p>setOutputPortAttribute.</p>
    */
   public abstract <T> void setOutputPortAttribute(Operator.OutputPort<?> port, 
Attribute<T> key, T value);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java 
b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index af6b1bc..b301f9e 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -1476,6 +1476,12 @@ public class LogicalPlan implements Serializable, DAG
   @Override
   public <T> void setAttribute(Operator operator, Attribute<T> key, T value)
   {
+    setOperatorAttribute(operator, key, value);
+  }
+
+  @Override
+  public <T> void setOperatorAttribute(Operator operator, Attribute<T> key, T 
value)
+  {
     this.getMeta(operator).attributes.put(key, value);
   }
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/AffinityRulesTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/AffinityRulesTest.java 
b/engine/src/test/java/com/datatorrent/stram/AffinityRulesTest.java
index 88bf133..23e3a97 100644
--- a/engine/src/test/java/com/datatorrent/stram/AffinityRulesTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/AffinityRulesTest.java
@@ -70,7 +70,7 @@ public class AffinityRulesTest
     dag.addStream("stream1", o1.outport, o2.inport1);
     dag.addStream("stream2", o2.outport1, o3.inport1);
 
-    dag.setAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(5));
+    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(5));
 
     AffinityRulesSet ruleSet = new AffinityRulesSet();
     // Valid case:
@@ -137,10 +137,10 @@ public class AffinityRulesTest
     dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
 
     GenericTestOperator o1 = dag.addOperator("O1", GenericTestOperator.class);
-    dag.setAttribute(o1, OperatorContext.MEMORY_MB, 256);
+    dag.setOperatorAttribute(o1, OperatorContext.MEMORY_MB, 256);
 
     GenericTestOperator o2 = dag.addOperator("O2", GenericTestOperator.class);
-    dag.setAttribute(o2, OperatorContext.MEMORY_MB, 256);
+    dag.setOperatorAttribute(o2, OperatorContext.MEMORY_MB, 256);
 
     dag.getMeta(o1).getAttributes().put(OperatorContext.LOCALITY_HOST, 
"host1");
     AffinityRulesSet ruleSet = new AffinityRulesSet();

http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java 
b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index 5675b53..db939cd 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -139,7 +139,7 @@ public class CheckpointTest
     MockInputOperator o1 = dag.addOperator("o1", new MockInputOperator());
 
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
-    dag.setAttribute(o2, OperatorContext.STATELESS, true);
+    dag.setOperatorAttribute(o2, OperatorContext.STATELESS, true);
 
     dag.addStream("o1.outport", o1.outport, 
o2.inport1).setLocality(Locality.CONTAINER_LOCAL);
 
@@ -433,7 +433,7 @@ public class CheckpointTest
     
dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new 
MemoryStorageAgent());
 
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
-    dag.setAttribute(o1, OperatorContext.TIMEOUT_WINDOW_COUNT, 2);
+    dag.setOperatorAttribute(o1, OperatorContext.TIMEOUT_WINDOW_COUNT, 2);
 
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
 
@@ -495,7 +495,7 @@ public class CheckpointTest
     MockInputOperator o1 = dag.addOperator("o1", new MockInputOperator());
 
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
-    dag.setAttribute(o2, OperatorContext.STATELESS, true);
+    dag.setOperatorAttribute(o2, OperatorContext.STATELESS, true);
 
     dag.addStream("o1.outport", o1.outport, o2.inport1);
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/HostLocalTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/HostLocalTest.java 
b/engine/src/test/java/com/datatorrent/stram/HostLocalTest.java
index a2c58cb..b271949 100644
--- a/engine/src/test/java/com/datatorrent/stram/HostLocalTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/HostLocalTest.java
@@ -114,10 +114,10 @@ public class HostLocalTest
 
 
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
-    dag.setAttribute(o1,OperatorContext.MEMORY_MB,256);
+    dag.setOperatorAttribute(o1,OperatorContext.MEMORY_MB,256);
 
     GenericTestOperator partitioned = dag.addOperator("partitioned", 
GenericTestOperator.class);
-    dag.setAttribute(partitioned,OperatorContext.MEMORY_MB,256);
+    dag.setOperatorAttribute(partitioned,OperatorContext.MEMORY_MB,256);
     
dag.getMeta(partitioned).getAttributes().put(OperatorContext.LOCALITY_HOST, 
"host1");
 
     dag.addStream("o1_outport1", o1.outport1, 
partitioned.inport1).setLocality(Locality.NODE_LOCAL);
@@ -158,8 +158,8 @@ public class HostLocalTest
 
     GenericTestOperator partitioned = dag.addOperator("partitioned", 
GenericTestOperator.class);
     dag.addStream("o1_outport1", o1.outport1, 
partitioned.inport1).setLocality(Locality.THREAD_LOCAL);
-    dag.setAttribute(o1,OperatorContext.MEMORY_MB,256);
-    dag.setAttribute(partitioned,OperatorContext.MEMORY_MB,256);
+    dag.setOperatorAttribute(o1,OperatorContext.MEMORY_MB,256);
+    dag.setOperatorAttribute(partitioned,OperatorContext.MEMORY_MB,256);
 
     StreamingContainerManager scm = new StreamingContainerManager(dag);
 
@@ -196,8 +196,8 @@ public class HostLocalTest
 
     GenericTestOperator partitioned = dag.addOperator("partitioned", 
GenericTestOperator.class);
     dag.addStream("o1_outport1", o1.outport1, 
partitioned.inport1).setLocality(Locality.CONTAINER_LOCAL);
-    dag.setAttribute(o1, OperatorContext.MEMORY_MB, 256);
-    dag.setAttribute(partitioned,OperatorContext.MEMORY_MB,256);
+    dag.setOperatorAttribute(o1, OperatorContext.MEMORY_MB, 256);
+    dag.setOperatorAttribute(partitioned,OperatorContext.MEMORY_MB,256);
 
     StreamingContainerManager scm = new StreamingContainerManager(dag);
 
@@ -232,9 +232,9 @@ public class HostLocalTest
 
     GenericTestOperator partitioned = dag.addOperator("partitioned", 
GenericTestOperator.class);
     dag.addStream("o1_outport1", o1.outport1, 
partitioned.inport1).setLocality(Locality.CONTAINER_LOCAL);
-    dag.setAttribute(o1,OperatorContext.MEMORY_MB,256);
-    dag.setAttribute(o1,OperatorContext.VCORES,1);
-    dag.setAttribute(partitioned,OperatorContext.VCORES,1);
+    dag.setOperatorAttribute(o1,OperatorContext.MEMORY_MB,256);
+    dag.setOperatorAttribute(o1,OperatorContext.VCORES,1);
+    dag.setOperatorAttribute(partitioned,OperatorContext.VCORES,1);
 
     StreamingContainerManager scm = new StreamingContainerManager(dag);
 
@@ -272,9 +272,9 @@ public class HostLocalTest
 
     GenericTestOperator partitioned = dag.addOperator("partitioned", 
GenericTestOperator.class);
     dag.addStream("o1_outport1", o1.outport1, 
partitioned.inport1).setLocality(Locality.CONTAINER_LOCAL);
-    dag.setAttribute(o1, OperatorContext.MEMORY_MB, 256);
-    dag.setAttribute(o1, OperatorContext.VCORES, 2);
-    dag.setAttribute(partitioned, OperatorContext.VCORES, 1);
+    dag.setOperatorAttribute(o1, OperatorContext.MEMORY_MB, 256);
+    dag.setOperatorAttribute(o1, OperatorContext.VCORES, 2);
+    dag.setOperatorAttribute(partitioned, OperatorContext.VCORES, 1);
 
     StreamingContainerManager scm = new StreamingContainerManager(dag);
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java 
b/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java
index 470a666..e83c7e4 100644
--- a/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java
@@ -68,7 +68,7 @@ public class OutputUnifiedTest
     GenericTestOperator op1 = new GenericTestOperator();
     dag.addOperator("op1", op1);
 
-    dag.setAttribute(op1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
+    dag.setOperatorAttribute(op1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
 
     TestOutputOperator op2 = new TestOutputOperator();
     dag.addOperator("op2", op2);
@@ -97,12 +97,12 @@ public class OutputUnifiedTest
     GenericTestOperator op1 = new GenericTestOperator();
     dag.addOperator("op1", op1);
 
-    dag.setAttribute(op1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
+    dag.setOperatorAttribute(op1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
 
     TestOutputOperator op2 = new TestOutputOperator();
     dag.addOperator("op2", op2);
 
-    dag.setAttribute(op2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
+    dag.setOperatorAttribute(op2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
 
     dag.addStream("s1", i1.output, op1.inport1);
     dag.addStream("s2", op1.outport1, op2.inport);
@@ -125,7 +125,7 @@ public class OutputUnifiedTest
     TestInputOperator i1 = new TestInputOperator();
     dag.addOperator("i1", i1);
 
-    dag.setAttribute(i1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
+    dag.setOperatorAttribute(i1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
 
     GenericTestOperator op1 = new GenericTestOperator();
     dag.addOperator("op1", op1);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java 
b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
index 26575a4..4f8becd 100644
--- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
@@ -281,8 +281,8 @@ public class PartitioningTest
 
     CollectorOperator collector = dag.addOperator("partitionedCollector", new 
CollectorOperator());
     collector.prefix = "" + System.identityHashCode(collector);
-    dag.setAttribute(collector, OperatorContext.PARTITIONER, new 
StatelessPartitioner<CollectorOperator>(2));
-    dag.setAttribute(collector, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitionLoadWatch()}));
+    dag.setOperatorAttribute(collector, OperatorContext.PARTITIONER, new 
StatelessPartitioner<CollectorOperator>(2));
+    dag.setOperatorAttribute(collector, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitionLoadWatch()}));
     dag.addStream("fromInput", input.output, collector.input);
 
     CollectorOperator singleCollector = dag.addOperator("singleCollector", new 
CollectorOperator());
@@ -424,7 +424,7 @@ public class PartitioningTest
       dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new 
AsyncFSStorageAgent(checkpointDir.getPath(), null));
 
       PartitionableInputOperator input = dag.addOperator("input", new 
PartitionableInputOperator());
-      dag.setAttribute(input, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitionLoadWatch()}));
+      dag.setOperatorAttribute(input, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitionLoadWatch()}));
 
       StramLocalCluster lc = new StramLocalCluster(dag);
       lc.setHeartbeatMonitoringEnabled(false);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java 
b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index d9a9ee4..3d3f7b0 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -226,7 +226,7 @@ public class StreamCodecTest
   {
     GenericTestOperator node1 = dag.addOperator("node1", 
GenericTestOperator.class);
     GenericTestOperator node2 = dag.addOperator("node2", 
GenericTestOperator.class);
-    dag.setAttribute(node2, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
+    dag.setOperatorAttribute(node2, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
     TestStreamCodec serDe = new TestStreamCodec();
     dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, 
serDe);
 
@@ -273,9 +273,9 @@ public class StreamCodecTest
   public void testMxNPartitioningStreamCodec()
   {
     GenericTestOperator node1 = dag.addOperator("node1", 
GenericTestOperator.class);
-    dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
+    dag.setOperatorAttribute(node1, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
     GenericTestOperator node2 = dag.addOperator("node2", 
GenericTestOperator.class);
-    dag.setAttribute(node2, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
+    dag.setOperatorAttribute(node2, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
     TestStreamCodec serDe = new TestStreamCodec();
     dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, 
serDe);
 
@@ -337,7 +337,7 @@ public class StreamCodecTest
   public void testParallelPartitioningStreamCodec()
   {
     GenericTestOperator node1 = dag.addOperator("node1", 
GenericTestOperator.class);
-    dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
+    dag.setOperatorAttribute(node1, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
     GenericTestOperator node2 = dag.addOperator("node2", 
GenericTestOperator.class);
     dag.setInputPortAttribute(node2.inport1, 
Context.PortContext.PARTITION_PARALLEL, true);
     TestStreamCodec serDe = new TestStreamCodec();
@@ -478,7 +478,7 @@ public class StreamCodecTest
   {
     GenericTestOperator node1 = dag.addOperator("node1", 
GenericTestOperator.class);
     GenericTestOperator node2 = dag.addOperator("node2", 
GenericTestOperator.class);
-    dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
+    dag.setOperatorAttribute(node1, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
     TestStreamCodec serDe = new TestStreamCodec();
     dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, 
serDe);
     GenericTestOperator node3 = dag.addOperator("node3", 
GenericTestOperator.class);
@@ -609,7 +609,7 @@ public class StreamCodecTest
     GenericTestOperator node1 = dag.addOperator("node1", 
GenericTestOperator.class);
     GenericTestOperator node2 = dag.addOperator("node2", 
GenericTestOperator.class);
     GenericTestOperator node3 = dag.addOperator("node3", 
GenericTestOperator.class);
-    dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
+    dag.setOperatorAttribute(node1, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
     TestStreamCodec serDe = new TestStreamCodec();
     dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, 
serDe);
     TestStreamCodec2 serDe2 = new TestStreamCodec2();
@@ -690,13 +690,13 @@ public class StreamCodecTest
   public void testMxNMultipleStreamCodecs()
   {
     GenericTestOperator node1 = dag.addOperator("node1", 
GenericTestOperator.class);
-    dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
+    dag.setOperatorAttribute(node1, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
     GenericTestOperator node2 = dag.addOperator("node2", 
GenericTestOperator.class);
-    dag.setAttribute(node2, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
+    dag.setOperatorAttribute(node2, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
     TestStreamCodec serDe = new TestStreamCodec();
     dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, 
serDe);
     GenericTestOperator node3 = dag.addOperator("node3", 
GenericTestOperator.class);
-    dag.setAttribute(node3, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
+    dag.setOperatorAttribute(node3, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
     TestStreamCodec serDe2 = new TestStreamCodec();
     dag.setInputPortAttribute(node3.inport1, Context.PortContext.STREAM_CODEC, 
serDe2);
 
@@ -873,7 +873,7 @@ public class StreamCodecTest
     GenericTestOperator node1 = dag.addOperator("node1", 
GenericTestOperator.class);
     GenericTestOperator node2 = dag.addOperator("node2", 
GenericTestOperator.class);
     GenericTestOperator node3 = dag.addOperator("node3", 
GenericTestOperator.class);
-    dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
+    dag.setOperatorAttribute(node1, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
     dag.setOutputPortAttribute(node1.outport1, 
Context.PortContext.UNIFIER_LIMIT, 2);
     TestStreamCodec serDe = new TestStreamCodec();
     dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, 
serDe);
@@ -955,16 +955,16 @@ public class StreamCodecTest
   public void testDynamicPartitioningStreamCodec()
   {
     GenericTestOperator node1 = dag.addOperator("node1", 
GenericTestOperator.class);
-    dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
-    dag.setAttribute(node1, Context.OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
+    dag.setOperatorAttribute(node1, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
+    dag.setOperatorAttribute(node1, Context.OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
     GenericTestOperator node2 = dag.addOperator("node2", 
GenericTestOperator.class);
-    dag.setAttribute(node2, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
-    dag.setAttribute(node2, Context.OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
+    dag.setOperatorAttribute(node2, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
+    dag.setOperatorAttribute(node2, Context.OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
     TestStreamCodec serDe = new TestStreamCodec();
     dag.setInputPortAttribute(node2.inport1, Context.PortContext.STREAM_CODEC, 
serDe);
     GenericTestOperator node3 = dag.addOperator("node3", 
GenericTestOperator.class);
-    dag.setAttribute(node3, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
-    dag.setAttribute(node3, Context.OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
+    dag.setOperatorAttribute(node3, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
+    dag.setOperatorAttribute(node3, Context.OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
     TestStreamCodec serDe2 = new TestStreamCodec();
     dag.setInputPortAttribute(node3.inport1, Context.PortContext.STREAM_CODEC, 
serDe2);
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java 
b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 385e1ae..11b938c 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -273,7 +273,7 @@ public class StreamingContainerManagerTest
     GenericTestOperator node1 = dag.addOperator("node1", 
GenericTestOperator.class);
     PhysicalPlanTest.PartitioningTestOperator node2 = dag.addOperator("node2", 
PhysicalPlanTest.PartitioningTestOperator.class);
     node2.setPartitionCount(3);
-    dag.setAttribute(node2, OperatorContext.SPIN_MILLIS, 10); /* this should 
not affect anything materially */
+    dag.setOperatorAttribute(node2, OperatorContext.SPIN_MILLIS, 10); /* this 
should not affect anything materially */
     dag.setOutputPortAttribute(node2.outport1, PortContext.QUEUE_CAPACITY, 
1111);
     GenericTestOperator node3 = dag.addOperator("node3", 
GenericTestOperator.class);
     dag.setInputPortAttribute(node3.inport1, PortContext.QUEUE_CAPACITY, 2222);
@@ -515,7 +515,7 @@ public class StreamingContainerManagerTest
   public void testProcessHeartbeat() throws Exception
   {
     TestGeneratorInputOperator o1 = dag.addOperator("o1", 
TestGeneratorInputOperator.class);
-    dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new 
StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
+    dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
     dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
 
     StreamingContainerManager scm = new StreamingContainerManager(dag);
@@ -722,7 +722,7 @@ public class StreamingContainerManagerTest
     dag.addStream("stream1", o1.outport1, o2.inport1);
     dag.addStream("stream2", o2.outport1, o3.inport1);
 
-    dag.setAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
+    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
     StreamingContainerManager scm = new StreamingContainerManager(dag);
 
     PhysicalPlan physicalPlan = scm.getPhysicalPlan();
@@ -810,7 +810,7 @@ public class StreamingContainerManagerTest
   {
     TestGeneratorInputOperator o1 = dag.addOperator("o1", 
TestGeneratorInputOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
-    dag.setAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
+    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
     dag.addStream("o1Output1", o1.outport, o2.inport1).setLocality(locality);
 
     int maxContainers = 5;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
index 89e9c2f..4b72360 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
@@ -204,7 +204,7 @@ public class AutoMetricTest
 
     TestOperator testOper = dag.addOperator("TestOperator", 
TestOperator.class);
     TestStatsListener sl = new TestStatsListener();
-    dag.setAttribute(testOper, OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)sl));
+    dag.setOperatorAttribute(testOper, OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)sl));
 
     GenericTestOperator collector = dag.addOperator("Collector", new 
GenericTestOperator());
     dag.addStream("TestTuples", testOper.outport, 
collector.inport1).setLocality(Locality.CONTAINER_LOCAL);
@@ -249,7 +249,7 @@ public class AutoMetricTest
 
     OperatorWithMetrics o1 = dag.addOperator("o1", OperatorWithMetrics.class);
     MockAggregator aggregator = new MockAggregator(latch);
-    dag.setAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, 
aggregator);
+    dag.setOperatorAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, 
aggregator);
 
     dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new 
StramTestSupport.MemoryStorageAgent());
 
@@ -277,10 +277,10 @@ public class AutoMetricTest
 
     OperatorWithMetrics o1 = dag.addOperator("o1", OperatorWithMetrics.class);
     MockAggregator aggregator = new MockAggregator(latch);
-    dag.setAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, 
aggregator);
+    dag.setOperatorAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, 
aggregator);
 
     dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new 
StramTestSupport.MemoryStorageAgent());
-    dag.setAttribute(o1, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestGeneratorInputOperator>(2));
+    dag.setOperatorAttribute(o1, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestGeneratorInputOperator>(2));
 
     dag.addStream("TestTuples", inputOperator.outport, o1.inport1);
 
@@ -428,7 +428,7 @@ public class AutoMetricTest
 
     OperatorWithMetricMethod o1 = dag.addOperator("o1", 
OperatorWithMetricMethod.class);
     MockAggregator aggregator = new MockAggregator(latch);
-    dag.setAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, 
aggregator);
+    dag.setOperatorAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, 
aggregator);
 
     dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new 
StramTestSupport.MemoryStorageAgent());
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
index c3e18b1..8ba57be 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java
@@ -147,8 +147,8 @@ public class SliderTest
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 100);
     Input input = dag.addOperator("Input", new Input());
     Sum sum = dag.addOperator("Sum", new Sum());
-    dag.setAttribute(sum, OperatorContext.APPLICATION_WINDOW_COUNT, 
applicationWindowCount);
-    dag.setAttribute(sum, OperatorContext.SLIDE_BY_WINDOW_COUNT, 
slideByWindowCount);
+    dag.setOperatorAttribute(sum, OperatorContext.APPLICATION_WINDOW_COUNT, 
applicationWindowCount);
+    dag.setOperatorAttribute(sum, OperatorContext.SLIDE_BY_WINDOW_COUNT, 
slideByWindowCount);
     Validator validate = dag.addOperator("validator", new Validator());
     Validator.numbersValidated = 0;
     validate.numberOfIntegers = applicationWindowCount;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
index 3968e4a..4625368 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java
@@ -182,13 +182,13 @@ public class StatsTest
     dag.setAttribute(OperatorContext.STORAGE_AGENT, new 
AsyncFSStorageAgent(workingDir, null));
     TestOperator testOper = dag.addOperator("TestOperator", 
TestOperator.class);
     TestInputStatsListener testInputStatsListener = new 
TestInputStatsListener();
-    dag.setAttribute(testOper, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{testInputStatsListener}));
+    dag.setOperatorAttribute(testOper, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{testInputStatsListener}));
     testOper.setMaxTuples(tupleCount);
     testOper.setEmitInterval(0);
 
     TestCollector collector = dag.addOperator("Collector", new 
TestCollector());
     TestCollectorStatsListener testCollectorStatsListener = new 
TestCollectorStatsListener();
-    dag.setAttribute(collector, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{testCollectorStatsListener}));
+    dag.setOperatorAttribute(collector, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{testCollectorStatsListener}));
     dag.addStream("TestTuples", testOper.outport, 
collector.inport1).setLocality(null);
 
     StramLocalCluster lc = new StramLocalCluster(dag);
@@ -241,7 +241,7 @@ public class StatsTest
 
     TestCollector collector = dag.addOperator("Collector", new 
TestCollector());
     if (statsListener != null) {
-      dag.setAttribute(collector, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{statsListener}));
+      dag.setOperatorAttribute(collector, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{statsListener}));
     }
 
     dag.addStream("TestTuples", testOper.outport, 
collector.inport1).setLocality(locality);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java 
b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
index 299172f..2457786 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
@@ -950,8 +950,8 @@ public class StreamPersistanceTests
     AscendingNumbersOperator ascend = dag.addOperator("ascend", new 
AscendingNumbersOperator());
 
     final TestReceiverOperator console = dag.addOperator("console", new 
TestReceiverOperator());
-    dag.setAttribute(console, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestReceiverOperator>(2));
-    dag.setAttribute(console, Context.OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
+    dag.setOperatorAttribute(console, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestReceiverOperator>(2));
+    dag.setOperatorAttribute(console, Context.OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
 
     final PartitionedTestPersistanceOperator console1 = new 
PartitionedTestPersistanceOperator();
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
index 75b20fe..821f4ea 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
@@ -122,7 +122,7 @@ public class DelayOperatorTest
     opC = dag.addOperator("C", GenericTestOperator.class);
     opD = dag.addOperator("D", GenericTestOperator.class);
     opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
-    dag.setAttribute(opDelay, 
Context.OperatorContext.APPLICATION_WINDOW_COUNT, 2);
+    dag.setOperatorAttribute(opDelay, 
Context.OperatorContext.APPLICATION_WINDOW_COUNT, 2);
     dag.addStream("BtoC", opB.outport1, opC.inport1);
     dag.addStream("CtoD", opC.outport1, opD.inport1);
     dag.addStream("CtoDelay", opC.outport2, opDelay.input);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
index 9a4e0e8..d4c68aa 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
@@ -474,7 +474,7 @@ public class LogicalPlanTest
     dag.addStream("Connection", input.outport, operator.input1);
 
 
-    dag.setAttribute(operator, OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestOperatorAnnotationOperator>(2));
+    dag.setOperatorAttribute(operator, OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestOperatorAnnotationOperator>(2));
 
     try {
       dag.validate();
@@ -483,7 +483,7 @@ public class LogicalPlanTest
       Assert.assertEquals("", "Operator " + dag.getMeta(operator).getName() + 
" provides partitioning capabilities but the annotation on the operator class 
declares it non partitionable!", e.getMessage());
     }
 
-    dag.setAttribute(operator, OperatorContext.PARTITIONER, null);
+    dag.setOperatorAttribute(operator, OperatorContext.PARTITIONER, null);
     dag.setInputPortAttribute(operator.input1, PortContext.PARTITION_PARALLEL, 
true);
 
     try {
@@ -545,13 +545,13 @@ public class LogicalPlanTest
     TestGeneratorInputOperator input2 = dag.addOperator("input2", 
TestGeneratorInputOperator.class);
 
     GenericTestOperator amoOper = dag.addOperator("amoOper", 
GenericTestOperator.class);
-    dag.setAttribute(amoOper, OperatorContext.PROCESSING_MODE, 
Operator.ProcessingMode.AT_MOST_ONCE);
+    dag.setOperatorAttribute(amoOper, OperatorContext.PROCESSING_MODE, 
Operator.ProcessingMode.AT_MOST_ONCE);
 
     dag.addStream("input1.outport", input1.outport, amoOper.inport1);
     dag.addStream("input2.outport", input2.outport, amoOper.inport2);
 
     GenericTestOperator outputOper = dag.addOperator("outputOper", 
GenericTestOperator.class);
-    dag.setAttribute(outputOper, OperatorContext.PROCESSING_MODE, 
Operator.ProcessingMode.AT_LEAST_ONCE);
+    dag.setOperatorAttribute(outputOper, OperatorContext.PROCESSING_MODE, 
Operator.ProcessingMode.AT_LEAST_ONCE);
     dag.addStream("aloOper.outport1", amoOper.outport1, outputOper.inport1);
 
     try {
@@ -560,7 +560,7 @@ public class LogicalPlanTest
     } catch (ValidationException ve) {
       Assert.assertEquals("", ve.getMessage(), "Processing mode 
outputOper/AT_LEAST_ONCE not valid for source amoOper/AT_MOST_ONCE");
     }
-    dag.setAttribute(outputOper, OperatorContext.PROCESSING_MODE, null);
+    dag.setOperatorAttribute(outputOper, OperatorContext.PROCESSING_MODE, 
null);
     dag.validate();
 
     OperatorMeta outputOperOm = dag.getMeta(outputOper);
@@ -577,7 +577,7 @@ public class LogicalPlanTest
     TestGeneratorInputOperator input2 = dag.addOperator("input2", 
TestGeneratorInputOperator.class);
 
     GenericTestOperator amoOper = dag.addOperator("amoOper", 
GenericTestOperator.class);
-    dag.setAttribute(amoOper, OperatorContext.PROCESSING_MODE, 
Operator.ProcessingMode.EXACTLY_ONCE);
+    dag.setOperatorAttribute(amoOper, OperatorContext.PROCESSING_MODE, 
Operator.ProcessingMode.EXACTLY_ONCE);
 
     dag.addStream("input1.outport", input1.outport, amoOper.inport1);
     dag.addStream("input2.outport", input2.outport, amoOper.inport2);
@@ -592,7 +592,7 @@ public class LogicalPlanTest
       Assert.assertEquals("", ve.getMessage(), "Processing mode for outputOper 
should be AT_MOST_ONCE for source amoOper/EXACTLY_ONCE");
     }
 
-    dag.setAttribute(outputOper, OperatorContext.PROCESSING_MODE, 
Operator.ProcessingMode.AT_LEAST_ONCE);
+    dag.setOperatorAttribute(outputOper, OperatorContext.PROCESSING_MODE, 
Operator.ProcessingMode.AT_LEAST_ONCE);
 
     try {
       dag.validate();
@@ -602,7 +602,7 @@ public class LogicalPlanTest
     }
 
     // AT_MOST_ONCE is valid
-    dag.setAttribute(outputOper, OperatorContext.PROCESSING_MODE, 
Operator.ProcessingMode.AT_MOST_ONCE);
+    dag.setOperatorAttribute(outputOper, OperatorContext.PROCESSING_MODE, 
Operator.ProcessingMode.AT_MOST_ONCE);
     dag.validate();
   }
 
@@ -803,7 +803,7 @@ public class LogicalPlanTest
     // Operator attribute not serializable test
     dag = new LogicalPlan();
     TestGeneratorInputOperator operator = dag.addOperator("TestOperator", 
TestGeneratorInputOperator.class);
-    dag.setAttribute(operator, attr, new TestAttributeValue());
+    dag.setOperatorAttribute(operator, attr, new TestAttributeValue());
     try {
       dag.validate();
       Assert.fail("Setting not serializable attribute should throw exception");
@@ -961,22 +961,22 @@ public class LogicalPlanTest
     TestGeneratorInputOperator input1 = dag.addOperator("input1", 
TestGeneratorInputOperator.class);
     GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
     dag.addStream("Stream1", input1.outport, x.inport1);
-    dag.setAttribute(x, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
-    dag.setAttribute(x, OperatorContext.APPLICATION_WINDOW_COUNT, 30);
+    dag.setOperatorAttribute(x, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
+    dag.setOperatorAttribute(x, OperatorContext.APPLICATION_WINDOW_COUNT, 30);
     dag.validate();
 
     TestGeneratorInputOperator input2 = dag.addOperator("input2", 
TestGeneratorInputOperator.class);
     CheckpointableWithinAppWindowOperator y = dag.addOperator("y", new 
CheckpointableWithinAppWindowOperator());
     dag.addStream("Stream2", input2.outport, y.inport1);
-    dag.setAttribute(y, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
-    dag.setAttribute(y, OperatorContext.APPLICATION_WINDOW_COUNT, 30);
+    dag.setOperatorAttribute(y, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
+    dag.setOperatorAttribute(y, OperatorContext.APPLICATION_WINDOW_COUNT, 30);
     dag.validate();
 
     TestGeneratorInputOperator input3 = dag.addOperator("input3", 
TestGeneratorInputOperator.class);
     NotCheckpointableWithinAppWindowOperator z = dag.addOperator("z", new 
NotCheckpointableWithinAppWindowOperator());
     dag.addStream("Stream3", input3.outport, z.inport1);
-    dag.setAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
-    dag.setAttribute(z, OperatorContext.APPLICATION_WINDOW_COUNT, 30);
+    dag.setOperatorAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
+    dag.setOperatorAttribute(z, OperatorContext.APPLICATION_WINDOW_COUNT, 30);
     try {
       dag.validate();
       Assert.fail("should fail because chekpoint window count is not a factor 
of application window count");
@@ -984,10 +984,10 @@ public class LogicalPlanTest
       // expected
     }
 
-    dag.setAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 30);
+    dag.setOperatorAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 30);
     dag.validate();
 
-    dag.setAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 45);
+    dag.setOperatorAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 45);
     try {
       dag.validate();
       Assert.fail("should fail because chekpoint window count is not a factor 
of application window count");

http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
 
b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
index f10905a..99dee31 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
@@ -320,7 +320,7 @@ public class PhysicalPlanTest
     GenericTestOperator node1 = dag.addOperator("node1", 
GenericTestOperator.class);
     GenericTestOperator node2 = dag.addOperator("node2", 
GenericTestOperator.class);
     dag.addStream("node1.outport1", node1.outport1, node2.inport1);
-    dag.setAttribute(node1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(5));
+    dag.setOperatorAttribute(node1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(5));
     dag.setOutputPortAttribute(node1.outport1, PortContext.UNIFIER_LIMIT, 3);
     PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
     List<PTContainer> containers = plan.getContainers();
@@ -347,7 +347,7 @@ public class PhysicalPlanTest
     GenericTestOperator node1 = dag.addOperator("node1", 
GenericTestOperator.class);
     GenericTestOperator node2 = dag.addOperator("node2", 
GenericTestOperator.class);
     dag.addStream("node1.outport1", node1.outport1, node2.inport1);
-    dag.setAttribute(node1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(8));
+    dag.setOperatorAttribute(node1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(8));
     dag.setOutputPortAttribute(node1.outport1, PortContext.UNIFIER_LIMIT, 4);
     PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
     List<PTContainer> containers = plan.getContainers();
@@ -473,8 +473,8 @@ public class PhysicalPlanTest
     LogicalPlan dag = new LogicalPlan();
     TestInputOperator<Object> o1 = dag.addOperator("o1", new 
TestInputOperator<>());
     OperatorMeta o1Meta = dag.getMeta(o1);
-    dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new 
StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
-    dag.setAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestInputOperator<Object>>(2));
+    dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestInputOperator<Object>>(2));
 
     TestPlanContext ctx = new TestPlanContext();
     dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
@@ -649,8 +649,8 @@ public class PhysicalPlanTest
 
     dag.addStream("o1.outport1", o1.output, o2.inport1);
     OperatorMeta o1Meta = dag.getMeta(o1);
-    dag.setAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
-    dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new 
StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
+    dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
 
     TestPlanContext ctx = new TestPlanContext();
     dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
@@ -979,7 +979,7 @@ public class PhysicalPlanTest
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
 
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
-    dag.setAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
+    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
 
     GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
 
@@ -1129,13 +1129,13 @@ public class PhysicalPlanTest
     LogicalPlan dag = new LogicalPlan();
 
     TestGeneratorInputOperator o1 = dag.addOperator("o1", 
TestGeneratorInputOperator.class);
-    dag.setAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestGeneratorInputOperator>(2));
-    dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestGeneratorInputOperator>(2));
+    dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
     OperatorMeta o1Meta = dag.getMeta(o1);
 
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
-    dag.setAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestGeneratorInputOperator>(3));
-    dag.setAttribute(o2, OperatorContext.STATS_LISTENERS, Arrays.asList(new 
StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
+    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestGeneratorInputOperator>(3));
+    dag.setOperatorAttribute(o2, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
     OperatorMeta o2Meta = dag.getMeta(o2);
 
     dag.addStream("o1.outport1", o1.outport, o2.inport1);
@@ -1331,14 +1331,14 @@ public class PhysicalPlanTest
     LogicalPlan dag = new LogicalPlan();
 
     TestGeneratorInputOperator o1 = dag.addOperator("o1", 
TestGeneratorInputOperator.class);
-    dag.setAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestGeneratorInputOperator>(2));
-    dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestGeneratorInputOperator>(2));
+    dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
     dag.setOutputPortAttribute(o1.outport, PortContext.UNIFIER_SINGLE_FINAL, 
true);
     OperatorMeta o1Meta = dag.getMeta(o1);
 
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
-    dag.setAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestGeneratorInputOperator>(3));
-    dag.setAttribute(o2, OperatorContext.STATS_LISTENERS, Arrays.asList(new 
StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
+    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<TestGeneratorInputOperator>(3));
+    dag.setOperatorAttribute(o2, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
     OperatorMeta o2Meta = dag.getMeta(o2);
 
     dag.addStream("o1.outport1", o1.outport, o2.inport1);
@@ -1524,8 +1524,8 @@ public class PhysicalPlanTest
     LogicalPlan dag = new LogicalPlan();
 
     TestGeneratorInputOperator o1 = dag.addOperator("o1", 
TestGeneratorInputOperator.class);
-    dag.setAttribute(o1, OperatorContext.PARTITIONER, new 
TestAugmentingPartitioner<TestGeneratorInputOperator>(3));
-    dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new 
TestAugmentingPartitioner<TestGeneratorInputOperator>(3));
+    dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, 
Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
     OperatorMeta o1Meta = dag.getMeta(o1);
 
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
@@ -1617,13 +1617,13 @@ public class PhysicalPlanTest
     o1.partitionKeys = new Integer[] {0,1,2,3};
     o1.setPartitionCount(o1.partitionKeys.length);
 
-    dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new 
StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
+    dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
 
     dag.setOutputPortAttribute(o1.outport1, PortContext.UNIFIER_LIMIT, 2);
     OperatorMeta o1Meta = dag.getMeta(o1);
 
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
-    dag.setAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
+    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
     OperatorMeta o2Meta = dag.getMeta(o2);
 
     dag.addStream("o1.outport1", o1.outport1, o2.inport1);
@@ -1719,13 +1719,13 @@ public class PhysicalPlanTest
     o1.partitionKeys = new Integer[]{0, 1, 2, 3};
     o1.setPartitionCount(3);
 
-    dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new 
StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
+    dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
     dag.setOutputPortAttribute(o1.outport1, PortContext.UNIFIER_LIMIT, 2);
     dag.setOutputPortAttribute(o1.outport1, PortContext.UNIFIER_SINGLE_FINAL, 
true);
     OperatorMeta o1Meta = dag.getMeta(o1);
 
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
-    dag.setAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
+    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
     OperatorMeta o2Meta = dag.getMeta(o2);
 
     dag.addStream("o1.outport1", o1.outport1, o2.inport1);
@@ -1826,11 +1826,11 @@ public class PhysicalPlanTest
     LogicalPlan dag = new LogicalPlan();
 
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
-    dag.setAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
     OperatorMeta o1Meta = dag.getMeta(o1);
 
     GenericTestOperator o2 =  dag.addOperator("o2", GenericTestOperator.class);
-    dag.setAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
+    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
     dag.setInputPortAttribute(o2.inport1, PortContext.UNIFIER_SINGLE_FINAL, 
true);
     OperatorMeta o2Meta = dag.getMeta(o2);
 
@@ -1880,16 +1880,16 @@ public class PhysicalPlanTest
     LogicalPlan dag = new LogicalPlan();
 
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
-    dag.setAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
     OperatorMeta o1Meta = dag.getMeta(o1);
 
     GenericTestOperator o2 =  dag.addOperator("o2", GenericTestOperator.class);
-    dag.setAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(4));
+    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(4));
     dag.setInputPortAttribute(o2.inport1, PortContext.UNIFIER_SINGLE_FINAL, 
true);
     OperatorMeta o2Meta = dag.getMeta(o2);
 
     GenericTestOperator o3 =  dag.addOperator("o3", GenericTestOperator.class);
-    dag.setAttribute(o3, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
+    dag.setOperatorAttribute(o3, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
     OperatorMeta o3Meta = dag.getMeta(o3);
 
     dag.addStream("o1o2o3", o1.outport1, o2.inport1, o3.inport1);
@@ -1945,13 +1945,13 @@ public class PhysicalPlanTest
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
     GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
-    dag.setAttribute(o1,OperatorContext.VCORES,1);
-    dag.setAttribute(o2,OperatorContext.VCORES,2);
+    dag.setOperatorAttribute(o1,OperatorContext.VCORES,1);
+    dag.setOperatorAttribute(o2,OperatorContext.VCORES,2);
 
     dag.addStream("o1.outport1", o1.outport1, o2.inport1);
     dag.addStream("o2.outport1", o2.outport1, o3.inport1);
 
-    dag.setAttribute(o2, OperatorContext.MEMORY_MB, 4000);
+    dag.setOperatorAttribute(o2, OperatorContext.MEMORY_MB, 4000);
     dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
 
     PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
@@ -1976,12 +1976,12 @@ public class PhysicalPlanTest
     GenericTestOperator o4 = dag.addOperator("o4", GenericTestOperator.class);
     GenericTestOperator o5 = dag.addOperator("o5", GenericTestOperator.class);
     GenericTestOperator o6 = dag.addOperator("o6", GenericTestOperator.class);
-    dag.setAttribute(o1,OperatorContext.VCORES,1);
-    dag.setAttribute(o2,OperatorContext.VCORES,2);
-    dag.setAttribute(o3,OperatorContext.VCORES,3);
-    dag.setAttribute(o4,OperatorContext.VCORES,4);
-    dag.setAttribute(o5,OperatorContext.VCORES,5);
-    dag.setAttribute(o6,OperatorContext.VCORES,6);
+    dag.setOperatorAttribute(o1,OperatorContext.VCORES,1);
+    dag.setOperatorAttribute(o2,OperatorContext.VCORES,2);
+    dag.setOperatorAttribute(o3,OperatorContext.VCORES,3);
+    dag.setOperatorAttribute(o4,OperatorContext.VCORES,4);
+    dag.setOperatorAttribute(o5,OperatorContext.VCORES,5);
+    dag.setOperatorAttribute(o6,OperatorContext.VCORES,6);
 
     dag.addStream("o1.outport1", o1.outport1, 
o2.inport1).setLocality(Locality.CONTAINER_LOCAL);
     dag.addStream("o2.outport1", o2.outport1, o3.inport1, 
o4.inport1).setLocality(Locality.THREAD_LOCAL);
@@ -2005,8 +2005,8 @@ public class PhysicalPlanTest
 
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
-    dag.setAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
-    dag.setAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(3));
+    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
 
     dag.addStream("o1.outport1", o1.outport1, o2.inport1);
     dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 10);
@@ -2061,11 +2061,11 @@ public class PhysicalPlanTest
     LogicalPlan dag = new LogicalPlan();
 
     GenericTestOperator nodeX = dag.addOperator("X", 
GenericTestOperator.class);
-    dag.setAttribute(nodeX, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
-    dag.setAttribute(nodeX, Context.OperatorContext.STATS_LISTENERS, 
Lists.newArrayList(listener));
+    dag.setOperatorAttribute(nodeX, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<GenericTestOperator>(2));
+    dag.setOperatorAttribute(nodeX, Context.OperatorContext.STATS_LISTENERS, 
Lists.newArrayList(listener));
 
     GenericTestOperator nodeY = dag.addOperator("Y", 
GenericTestOperator.class);
-    dag.setAttribute(nodeY, Context.OperatorContext.PARTITIONER, new 
TestPartitioner<GenericTestOperator>());
+    dag.setOperatorAttribute(nodeY, Context.OperatorContext.PARTITIONER, new 
TestPartitioner<GenericTestOperator>());
 
     GenericTestOperator nodeZ = dag.addOperator("Z", 
GenericTestOperator.class);
 
@@ -2155,8 +2155,8 @@ public class PhysicalPlanTest
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
     GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
-    dag.setAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
-    dag.setAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
+    dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
+    dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
     
dag.getOperatorMeta("o1").getMeta(o1.outport1).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB,
 2000);
     
dag.getOperatorMeta("o1").getMeta(o1.outport2).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB,
 4000);
 
@@ -2193,13 +2193,13 @@ public class PhysicalPlanTest
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
     GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
-    dag.setAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
-    dag.setAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
-    dag.setAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<Operator>(2));
+    dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
+    dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<Operator>(2));
     
dag.getOperatorMeta("o1").getMeta(o1.outport1).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB,
 1024);
-    dag.setAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<Operator>(2));
-    dag.setAttribute(o2, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
-    dag.setAttribute(o2, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
+    dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new 
StatelessPartitioner<Operator>(2));
+    dag.setOperatorAttribute(o2, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
+    dag.setOperatorAttribute(o2, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
 
     dag.addStream("o1.outport1", o1.outport1, o2.inport1);
     dag.addStream("o2.outport1", o2.outport1, o3.inport1);
@@ -2216,10 +2216,10 @@ public class PhysicalPlanTest
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
     GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
-    dag.setAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
-    dag.setAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<Operator>(2));
+    dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
+    dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new 
StatelessPartitioner<Operator>(2));
     dag.setInputPortAttribute(o2.inport1, PortContext.PARTITION_PARALLEL, 
true);
-    dag.setAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
+    dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
 
     dag.addStream("o1.outport1", o1.outport1, o2.inport1);
     dag.addStream("o2.outport1", o2.outport1, o3.inport1);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/558942aa/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java 
b/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
index acd370d..e614750 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOStreamTest.java
@@ -196,11 +196,11 @@ public class OiOStreamTest
     plan.addStream("OiOout1", intermediateOperator2.output, 
outputOperator.input).setLocality(Locality.THREAD_LOCAL);
     plan.addStream("OiOout2", intermediateOperator4.output, 
outputOperator.input2).setLocality(Locality.THREAD_LOCAL);
 
-    plan.setAttribute(inputOperator, OperatorContext.VCORES, 1);
-    plan.setAttribute(intermediateOperator1, OperatorContext.VCORES, 1);
-    plan.setAttribute(intermediateOperator2, OperatorContext.VCORES, 2);
-    plan.setAttribute(intermediateOperator3, OperatorContext.VCORES, 3);
-    plan.setAttribute(intermediateOperator4, OperatorContext.VCORES, 5);
+    plan.setOperatorAttribute(inputOperator, OperatorContext.VCORES, 1);
+    plan.setOperatorAttribute(intermediateOperator1, OperatorContext.VCORES, 
1);
+    plan.setOperatorAttribute(intermediateOperator2, OperatorContext.VCORES, 
2);
+    plan.setOperatorAttribute(intermediateOperator3, OperatorContext.VCORES, 
3);
+    plan.setOperatorAttribute(intermediateOperator4, OperatorContext.VCORES, 
5);
     plan.setAttribute(OperatorContext.STORAGE_AGENT, new 
StramTestSupport.MemoryStorageAgent());
 
     try {

Reply via email to