Repository: apex-core
Updated Branches:
  refs/heads/master 576047e41 -> d6f17f23a


APEXCORE-580 APEXCORE-581 Renamed UserDefinedControlTuple to ControlTuple, 
moved to different package


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d6f17f23
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d6f17f23
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d6f17f23

Branch: refs/heads/master
Commit: d6f17f23a9c11e49a72dc87f76908c71ec235b11
Parents: 576047e
Author: bhupeshchawda <[email protected]>
Authored: Mon Mar 6 18:14:21 2017 +0530
Committer: bhupeshchawda <[email protected]>
Committed: Sun Mar 19 18:54:50 2017 +0530

----------------------------------------------------------------------
 .../api/ControlTupleEnabledSink.java            |  6 +--
 .../apex/api/ControlAwareDefaultInputPort.java  |  5 ++-
 .../apex/api/ControlAwareDefaultOutputPort.java |  9 ++--
 .../apex/api/UserDefinedControlTuple.java       | 46 --------------------
 .../apache/apex/api/operator/ControlTuple.java  | 46 ++++++++++++++++++++
 .../datatorrent/stram/engine/GenericNode.java   | 10 ++---
 .../com/datatorrent/stram/engine/OiONode.java   | 10 ++---
 .../stram/engine/WindowGenerator.java           |  4 +-
 .../stram/stream/BufferServerPublisher.java     |  4 +-
 .../stram/stream/BufferServerSubscriber.java    |  4 +-
 .../datatorrent/stram/stream/FastPublisher.java |  4 +-
 .../datatorrent/stram/stream/InlineStream.java  |  4 +-
 .../com/datatorrent/stram/stream/MuxStream.java |  4 +-
 .../com/datatorrent/stram/stream/OiOStream.java |  4 +-
 .../stram/CustomControlTupleTest.java           |  8 ++--
 15 files changed, 85 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java 
b/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java
index e27003d..eeb952a 100644
--- a/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java
+++ b/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java
@@ -18,7 +18,7 @@
  */
 package com.datatorrent.api;
 
-import org.apache.apex.api.UserDefinedControlTuple;
+import org.apache.apex.api.operator.ControlTuple;
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
@@ -35,7 +35,7 @@ public interface ControlTupleEnabledSink<T> extends Sink<T>
     }
 
     @Override
-    public boolean putControl(UserDefinedControlTuple payload)
+    public boolean putControl(ControlTuple payload)
     {
       return true;
     }
@@ -52,5 +52,5 @@ public interface ControlTupleEnabledSink<T> extends Sink<T>
    *
    * @param payload the control tuple payload
    */
-  public boolean putControl(UserDefinedControlTuple payload);
+  public boolean putControl(ControlTuple payload);
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java
----------------------------------------------------------------------
diff --git 
a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java 
b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java
index ff2b849..07e8343 100644
--- a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java
+++ b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java
@@ -18,6 +18,7 @@
  */
 package org.apache.apex.api;
 
+import org.apache.apex.api.operator.ControlTuple;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.api.ControlTupleEnabledSink;
@@ -31,7 +32,7 @@ import com.datatorrent.api.DefaultInputPort;
 public abstract class ControlAwareDefaultInputPort<T> extends 
DefaultInputPort<T> implements ControlTupleEnabledSink<T>
 {
   @Override
-  public boolean putControl(UserDefinedControlTuple payload)
+  public boolean putControl(ControlTuple payload)
   {
     count++;
     return processControl(payload);
@@ -42,5 +43,5 @@ public abstract class ControlAwareDefaultInputPort<T> extends 
DefaultInputPort<T
    *
    * @param payload the control tuple payload generated by upstream operator(s)
    */
-  public abstract boolean processControl(UserDefinedControlTuple payload);
+  public abstract boolean processControl(ControlTuple payload);
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java
----------------------------------------------------------------------
diff --git 
a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java 
b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java
index 4a83518..77addf9 100644
--- a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java
+++ b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java
@@ -18,6 +18,7 @@
  */
 package org.apache.apex.api;
 
+import org.apache.apex.api.operator.ControlTuple;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.api.ControlTupleEnabledSink;
@@ -25,8 +26,8 @@ import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Sink;
 
 /**
- * Default implementation for an output port which can emit a @{@link 
UserDefinedControlTuple}.
- * The {@link #emitControl(UserDefinedControlTuple)} method can be used to 
emit control tuples onto this output port
+ * Default implementation for an output port which can emit a @{@link 
ControlTuple}.
+ * The {@link #emitControl(ControlTuple)} method can be used to emit control 
tuples onto this output port
  */
 @InterfaceStability.Evolving
 public class ControlAwareDefaultOutputPort<T> extends DefaultOutputPort<T>
@@ -37,10 +38,10 @@ public class ControlAwareDefaultOutputPort<T> extends 
DefaultOutputPort<T>
   }
 
   /**
-   * Allows the operator to emit a @{@link UserDefinedControlTuple}
+   * Allows the operator to emit a @{@link ControlTuple}
    * @param {@link UserDefinedControlTuple}
    */
-  public void emitControl(UserDefinedControlTuple tuple)
+  public void emitControl(ControlTuple tuple)
   {
     verifyOperatorThread();
     ((ControlTupleEnabledSink)sink).putControl(tuple);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java 
b/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java
deleted file mode 100644
index 8e62a8f..0000000
--- a/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.api;
-
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Any user generated control tuple must implement {@link 
UserDefinedControlTuple} interface
- */
[email protected]
-public interface UserDefinedControlTuple
-{
-  /**
-   * A user generated control tuple must specify a @{@link DeliveryType}
-   * @return @{@link DeliveryType} type
-   */
-  DeliveryType getDeliveryType();
-
-  /**
-   * All custom control tuples can be delivered according to the following 
semantics
-   * 1. IMMEDIATE - The control tuple will be delivered immediately to the 
next operator
-   * 2. END_WINDOW - The control tuple will be delivered to the next operator 
just before the
-   * com.datatorrent.api.Operator#endWindow() call.
-   */
-  enum DeliveryType
-  {
-    IMMEDIATE,
-    END_WINDOW
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/api/src/main/java/org/apache/apex/api/operator/ControlTuple.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/operator/ControlTuple.java 
b/api/src/main/java/org/apache/apex/api/operator/ControlTuple.java
new file mode 100644
index 0000000..a1aa6f9
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/operator/ControlTuple.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api.operator;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Any user generated control tuple must implement {@link ControlTuple} 
interface
+ */
[email protected]
+public interface ControlTuple
+{
+  /**
+   * A user generated control tuple must specify a @{@link DeliveryType}
+   * @return @{@link DeliveryType} type
+   */
+  DeliveryType getDeliveryType();
+
+  /**
+   * All custom control tuples can be delivered according to the following 
semantics
+   * 1. IMMEDIATE - The control tuple will be delivered immediately to the 
next operator
+   * 2. END_WINDOW - The control tuple will be delivered to the next operator 
just before the
+   * com.datatorrent.api.Operator#endWindow() call.
+   */
+  enum DeliveryType
+  {
+    IMMEDIATE,
+    END_WINDOW
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java 
b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index dae838d..ec87e40 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -32,7 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.apex.api.ControlAwareDefaultInputPort;
-import org.apache.apex.api.UserDefinedControlTuple;
+import org.apache.apex.api.operator.ControlTuple;
 import org.apache.commons.lang.UnhandledException;
 
 import com.google.common.base.Throwables;
@@ -377,7 +377,7 @@ public class GenericNode extends Node<Operator>
                       if (activeSink instanceof ControlAwareDefaultInputPort) {
                         ControlTupleEnabledSink sink = 
(ControlTupleEnabledSink)activeSink;
                         for (CustomControlTuple cct : portSet.getValue()) {
-                          if 
(!sink.putControl((UserDefinedControlTuple)cct.getUserObject())) {
+                          if 
(!sink.putControl((ControlTuple)cct.getUserObject())) {
                             // operator cannot handle control tuple; forward 
to sinks
                             forwardToSinks(delay, cct);
                           }
@@ -407,11 +407,11 @@ public class GenericNode extends Node<Operator>
                 /* All custom control tuples are expected to be arriving in 
the current window only.*/
                 /* Buffer control tuples until end of the window */
                 CustomControlTuple cct = (CustomControlTuple)t;
-                UserDefinedControlTuple udct = 
(UserDefinedControlTuple)cct.getUserObject();
+                ControlTuple udct = (ControlTuple)cct.getUserObject();
                 boolean forward = false;
 
                 // Handle Immediate Delivery Control Tuples
-                if 
(udct.getDeliveryType().equals(UserDefinedControlTuple.DeliveryType.IMMEDIATE)) 
{
+                if 
(udct.getDeliveryType().equals(ControlTuple.DeliveryType.IMMEDIATE)) {
                   if (!isDuplicate(immediateDeliveryTuples.get(activePort), 
cct)) {
                     // Forward immediately
                     if (reservoirPortMap.isEmpty()) {
@@ -422,7 +422,7 @@ public class GenericNode extends Node<Operator>
                     // activeSink may not be null
                     if (activeSink instanceof ControlAwareDefaultInputPort) {
                       ControlTupleEnabledSink sink = 
(ControlTupleEnabledSink)activeSink;
-                      if 
(!sink.putControl((UserDefinedControlTuple)cct.getUserObject())) {
+                      if (!sink.putControl((ControlTuple)cct.getUserObject())) 
{
                         forward = true;
                       }
                     } else {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java 
b/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java
index e2370ff..5d744fa 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.apex.api.ControlAwareDefaultInputPort;
-import org.apache.apex.api.UserDefinedControlTuple;
+import org.apache.apex.api.operator.ControlTuple;
 import org.apache.commons.lang.UnhandledException;
 
 import com.google.common.collect.Sets;
@@ -98,7 +98,7 @@ public class OiONode extends GenericNode
             for (CustomControlTuple cct : endWindowControlTuples) {
               Sink sink = ((OiOStream.OiOReservoir)reservoir).getSink();
               if (sink instanceof ControlAwareDefaultInputPort) {
-                if 
(!((ControlAwareDefaultInputPort)sink).putControl((UserDefinedControlTuple)cct.getUserObject()))
 {
+                if 
(!((ControlAwareDefaultInputPort)sink).putControl((ControlTuple)cct.getUserObject()))
 {
                   // Operator will not handle; forward to sinks
                   forwardToSinks(false, cct);
                 }
@@ -116,13 +116,13 @@ public class OiONode extends GenericNode
 
         case CUSTOM_CONTROL:
           CustomControlTuple cct = ((CustomControlTuple)t);
-          UserDefinedControlTuple udct = 
(UserDefinedControlTuple)cct.getUserObject();
+          ControlTuple udct = (ControlTuple)cct.getUserObject();
 
-          if 
(udct.getDeliveryType().equals(UserDefinedControlTuple.DeliveryType.IMMEDIATE)) 
{ // Immediate Delivery
+          if 
(udct.getDeliveryType().equals(ControlTuple.DeliveryType.IMMEDIATE)) { // 
Immediate Delivery
             if (!isDuplicate(immediateDeliveryControlTuples, cct)) {
               Sink sink = ((OiOStream.OiOReservoir)reservoir).getSink();
               if (sink instanceof ControlAwareDefaultInputPort) {
-                if 
(!((ControlAwareDefaultInputPort)sink).putControl((UserDefinedControlTuple)cct.getUserObject()))
 {
+                if 
(!((ControlAwareDefaultInputPort)sink).putControl((ControlTuple)cct.getUserObject()))
 {
                   // Operator will not handle; forward to sinks
                   forwardToSinks(false, cct);
                 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java 
b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
index 77ce1f0..c45650f 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
@@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.api.UserDefinedControlTuple;
+import org.apache.apex.api.operator.ControlTuple;
 
 import com.datatorrent.bufferserver.packet.MessageType;
 import com.datatorrent.common.util.ScheduledExecutorService;
@@ -230,7 +230,7 @@ public class WindowGenerator extends MuxReservoir 
implements Stream, Runnable
   }
 
   @Override
-  public boolean putControl(UserDefinedControlTuple payload)
+  public boolean putControl(ControlTuple payload)
   {
     throw new UnsupportedOperationException("Not supported yet.");
   }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java 
b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
index fa2d823..de6aced 100644
--- 
a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
+++ 
b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.api.UserDefinedControlTuple;
+import org.apache.apex.api.operator.ControlTuple;
 
 import com.datatorrent.api.StreamCodec;
 import com.datatorrent.bufferserver.client.Publisher;
@@ -171,7 +171,7 @@ public class BufferServerPublisher extends Publisher 
implements ByteCounterStrea
   }
 
   @Override
-  public boolean putControl(UserDefinedControlTuple payload)
+  public boolean putControl(ControlTuple payload)
   {
     put(new CustomControlTuple(payload));
     return false;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java 
b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
index 606add0..915553e 100644
--- 
a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
+++ 
b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.api.UserDefinedControlTuple;
+import org.apache.apex.api.operator.ControlTuple;
 
 import com.datatorrent.api.Sink;
 import com.datatorrent.api.StreamCodec;
@@ -199,7 +199,7 @@ public class BufferServerSubscriber extends Subscriber 
implements ByteCounterStr
   }
 
   @Override
-  public boolean putControl(UserDefinedControlTuple payload)
+  public boolean putControl(ControlTuple payload)
   {
     throw new UnsupportedOperationException("Not supported yet.");
   }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java 
b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
index 574e61c..69fbb54 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
@@ -28,7 +28,7 @@ import java.nio.channels.SocketChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.api.UserDefinedControlTuple;
+import org.apache.apex.api.operator.ControlTuple;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.KryoException;
@@ -486,7 +486,7 @@ public class FastPublisher extends Kryo implements 
ClientListener, Stream
   }
 
   @Override
-  public boolean putControl(UserDefinedControlTuple payload)
+  public boolean putControl(ControlTuple payload)
   {
     put(new CustomControlTuple(payload));
     return true;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java 
b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
index 7559a18..9e49078 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
@@ -21,7 +21,7 @@ package com.datatorrent.stram.stream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.api.UserDefinedControlTuple;
+import org.apache.apex.api.operator.ControlTuple;
 
 import com.datatorrent.stram.engine.AbstractReservoir;
 import com.datatorrent.stram.engine.Stream;
@@ -102,7 +102,7 @@ public class InlineStream implements Stream
   }
 
   @Override
-  public boolean putControl(UserDefinedControlTuple payload)
+  public boolean putControl(ControlTuple payload)
   {
     put(new CustomControlTuple(payload));
     return false;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java 
b/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java
index 007a3ac..7809c98 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.api.UserDefinedControlTuple;
+import org.apache.apex.api.operator.ControlTuple;
 
 import com.datatorrent.api.Sink;
 import com.datatorrent.stram.engine.Stream;
@@ -123,7 +123,7 @@ public class MuxStream implements 
Stream.MultiSinkCapableStream
   }
 
   @Override
-  public boolean putControl(UserDefinedControlTuple payload)
+  public boolean putControl(ControlTuple payload)
   {
     put(new CustomControlTuple(payload));
     return false;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java 
b/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java
index f4a2b9b..409ff6c 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java
@@ -18,7 +18,7 @@
  */
 package com.datatorrent.stram.stream;
 
-import org.apache.apex.api.UserDefinedControlTuple;
+import org.apache.apex.api.operator.ControlTuple;
 
 import com.datatorrent.api.Sink;
 import com.datatorrent.stram.engine.Stream;
@@ -80,7 +80,7 @@ public class OiOStream implements Stream
   }
 
   @Override
-  public boolean putControl(UserDefinedControlTuple payload)
+  public boolean putControl(ControlTuple payload)
   {
     put(new CustomControlTuple(payload));
     return false;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java 
b/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java
index 86078c2..6ca530b 100644
--- a/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.apex.api.ControlAwareDefaultInputPort;
 import org.apache.apex.api.ControlAwareDefaultOutputPort;
-import org.apache.apex.api.UserDefinedControlTuple;
+import org.apache.apex.api.operator.ControlTuple;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.Context;
@@ -122,7 +122,7 @@ public class CustomControlTupleTest
       }
 
       @Override
-      public boolean processControl(UserDefinedControlTuple tuple)
+      public boolean processControl(ControlTuple tuple)
       {
         output.emitControl(tuple);
         return true;
@@ -145,7 +145,7 @@ public class CustomControlTupleTest
     public final transient ControlAwareDefaultInputPort<Double> input = new 
ControlAwareDefaultInputPort<Double>()
     {
       @Override
-      public boolean processControl(UserDefinedControlTuple payload)
+      public boolean processControl(ControlTuple payload)
       {
         numControlTuples++;
         return false;
@@ -332,7 +332,7 @@ public class CustomControlTupleTest
     testApp(new Application5());
   }
 
-  public static class TestControlTuple implements UserDefinedControlTuple
+  public static class TestControlTuple implements ControlTuple
   {
     public long data;
     public boolean immediate;

Reply via email to