Repository: apex-core Updated Branches: refs/heads/master 576047e41 -> d6f17f23a
APEXCORE-580 APEXCORE-581 Renamed UserDefinedControlTuple to ControlTuple, moved to different package Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d6f17f23 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d6f17f23 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d6f17f23 Branch: refs/heads/master Commit: d6f17f23a9c11e49a72dc87f76908c71ec235b11 Parents: 576047e Author: bhupeshchawda <[email protected]> Authored: Mon Mar 6 18:14:21 2017 +0530 Committer: bhupeshchawda <[email protected]> Committed: Sun Mar 19 18:54:50 2017 +0530 ---------------------------------------------------------------------- .../api/ControlTupleEnabledSink.java | 6 +-- .../apex/api/ControlAwareDefaultInputPort.java | 5 ++- .../apex/api/ControlAwareDefaultOutputPort.java | 9 ++-- .../apex/api/UserDefinedControlTuple.java | 46 -------------------- .../apache/apex/api/operator/ControlTuple.java | 46 ++++++++++++++++++++ .../datatorrent/stram/engine/GenericNode.java | 10 ++--- .../com/datatorrent/stram/engine/OiONode.java | 10 ++--- .../stram/engine/WindowGenerator.java | 4 +- .../stram/stream/BufferServerPublisher.java | 4 +- .../stram/stream/BufferServerSubscriber.java | 4 +- .../datatorrent/stram/stream/FastPublisher.java | 4 +- .../datatorrent/stram/stream/InlineStream.java | 4 +- .../com/datatorrent/stram/stream/MuxStream.java | 4 +- .../com/datatorrent/stram/stream/OiOStream.java | 4 +- .../stram/CustomControlTupleTest.java | 8 ++-- 15 files changed, 85 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java b/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java index e27003d..eeb952a 100644 --- a/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java +++ b/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java @@ -18,7 +18,7 @@ */ package com.datatorrent.api; -import org.apache.apex.api.UserDefinedControlTuple; +import org.apache.apex.api.operator.ControlTuple; import org.apache.hadoop.classification.InterfaceStability; /** @@ -35,7 +35,7 @@ public interface ControlTupleEnabledSink<T> extends Sink<T> } @Override - public boolean putControl(UserDefinedControlTuple payload) + public boolean putControl(ControlTuple payload) { return true; } @@ -52,5 +52,5 @@ public interface ControlTupleEnabledSink<T> extends Sink<T> * * @param payload the control tuple payload */ - public boolean putControl(UserDefinedControlTuple payload); + public boolean putControl(ControlTuple payload); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java index ff2b849..07e8343 100644 --- a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java +++ b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java @@ -18,6 +18,7 @@ */ package org.apache.apex.api; +import org.apache.apex.api.operator.ControlTuple; import org.apache.hadoop.classification.InterfaceStability; import com.datatorrent.api.ControlTupleEnabledSink; @@ -31,7 +32,7 @@ import com.datatorrent.api.DefaultInputPort; public abstract class ControlAwareDefaultInputPort<T> extends DefaultInputPort<T> implements ControlTupleEnabledSink<T> { @Override - public boolean putControl(UserDefinedControlTuple payload) + public boolean putControl(ControlTuple payload) { count++; return processControl(payload); @@ -42,5 +43,5 @@ public abstract class ControlAwareDefaultInputPort<T> extends DefaultInputPort<T * * @param payload the control tuple payload generated by upstream operator(s) */ - public abstract boolean processControl(UserDefinedControlTuple payload); + public abstract boolean processControl(ControlTuple payload); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java index 4a83518..77addf9 100644 --- a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java +++ b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java @@ -18,6 +18,7 @@ */ package org.apache.apex.api; +import org.apache.apex.api.operator.ControlTuple; import org.apache.hadoop.classification.InterfaceStability; import com.datatorrent.api.ControlTupleEnabledSink; @@ -25,8 +26,8 @@ import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Sink; /** - * Default implementation for an output port which can emit a @{@link UserDefinedControlTuple}. - * The {@link #emitControl(UserDefinedControlTuple)} method can be used to emit control tuples onto this output port + * Default implementation for an output port which can emit a @{@link ControlTuple}. + * The {@link #emitControl(ControlTuple)} method can be used to emit control tuples onto this output port */ @InterfaceStability.Evolving public class ControlAwareDefaultOutputPort<T> extends DefaultOutputPort<T> @@ -37,10 +38,10 @@ public class ControlAwareDefaultOutputPort<T> extends DefaultOutputPort<T> } /** - * Allows the operator to emit a @{@link UserDefinedControlTuple} + * Allows the operator to emit a @{@link ControlTuple} * @param {@link UserDefinedControlTuple} */ - public void emitControl(UserDefinedControlTuple tuple) + public void emitControl(ControlTuple tuple) { verifyOperatorThread(); ((ControlTupleEnabledSink)sink).putControl(tuple); http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java b/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java deleted file mode 100644 index 8e62a8f..0000000 --- a/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.api; - -import org.apache.hadoop.classification.InterfaceStability; - -/** - * Any user generated control tuple must implement {@link UserDefinedControlTuple} interface - */ [email protected] -public interface UserDefinedControlTuple -{ - /** - * A user generated control tuple must specify a @{@link DeliveryType} - * @return @{@link DeliveryType} type - */ - DeliveryType getDeliveryType(); - - /** - * All custom control tuples can be delivered according to the following semantics - * 1. IMMEDIATE - The control tuple will be delivered immediately to the next operator - * 2. END_WINDOW - The control tuple will be delivered to the next operator just before the - * com.datatorrent.api.Operator#endWindow() call. - */ - enum DeliveryType - { - IMMEDIATE, - END_WINDOW - } -} http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/api/src/main/java/org/apache/apex/api/operator/ControlTuple.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/operator/ControlTuple.java b/api/src/main/java/org/apache/apex/api/operator/ControlTuple.java new file mode 100644 index 0000000..a1aa6f9 --- /dev/null +++ b/api/src/main/java/org/apache/apex/api/operator/ControlTuple.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.api.operator; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Any user generated control tuple must implement {@link ControlTuple} interface + */ [email protected] +public interface ControlTuple +{ + /** + * A user generated control tuple must specify a @{@link DeliveryType} + * @return @{@link DeliveryType} type + */ + DeliveryType getDeliveryType(); + + /** + * All custom control tuples can be delivered according to the following semantics + * 1. IMMEDIATE - The control tuple will be delivered immediately to the next operator + * 2. END_WINDOW - The control tuple will be delivered to the next operator just before the + * com.datatorrent.api.Operator#endWindow() call. + */ + enum DeliveryType + { + IMMEDIATE, + END_WINDOW + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java index dae838d..ec87e40 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java @@ -32,7 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.apex.api.ControlAwareDefaultInputPort; -import org.apache.apex.api.UserDefinedControlTuple; +import org.apache.apex.api.operator.ControlTuple; import org.apache.commons.lang.UnhandledException; import com.google.common.base.Throwables; @@ -377,7 +377,7 @@ public class GenericNode extends Node<Operator> if (activeSink instanceof ControlAwareDefaultInputPort) { ControlTupleEnabledSink sink = (ControlTupleEnabledSink)activeSink; for (CustomControlTuple cct : portSet.getValue()) { - if (!sink.putControl((UserDefinedControlTuple)cct.getUserObject())) { + if (!sink.putControl((ControlTuple)cct.getUserObject())) { // operator cannot handle control tuple; forward to sinks forwardToSinks(delay, cct); } @@ -407,11 +407,11 @@ public class GenericNode extends Node<Operator> /* All custom control tuples are expected to be arriving in the current window only.*/ /* Buffer control tuples until end of the window */ CustomControlTuple cct = (CustomControlTuple)t; - UserDefinedControlTuple udct = (UserDefinedControlTuple)cct.getUserObject(); + ControlTuple udct = (ControlTuple)cct.getUserObject(); boolean forward = false; // Handle Immediate Delivery Control Tuples - if (udct.getDeliveryType().equals(UserDefinedControlTuple.DeliveryType.IMMEDIATE)) { + if (udct.getDeliveryType().equals(ControlTuple.DeliveryType.IMMEDIATE)) { if (!isDuplicate(immediateDeliveryTuples.get(activePort), cct)) { // Forward immediately if (reservoirPortMap.isEmpty()) { @@ -422,7 +422,7 @@ public class GenericNode extends Node<Operator> // activeSink may not be null if (activeSink instanceof ControlAwareDefaultInputPort) { ControlTupleEnabledSink sink = (ControlTupleEnabledSink)activeSink; - if (!sink.putControl((UserDefinedControlTuple)cct.getUserObject())) { + if (!sink.putControl((ControlTuple)cct.getUserObject())) { forward = true; } } else { http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java b/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java index e2370ff..5d744fa 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java @@ -26,7 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.apex.api.ControlAwareDefaultInputPort; -import org.apache.apex.api.UserDefinedControlTuple; +import org.apache.apex.api.operator.ControlTuple; import org.apache.commons.lang.UnhandledException; import com.google.common.collect.Sets; @@ -98,7 +98,7 @@ public class OiONode extends GenericNode for (CustomControlTuple cct : endWindowControlTuples) { Sink sink = ((OiOStream.OiOReservoir)reservoir).getSink(); if (sink instanceof ControlAwareDefaultInputPort) { - if (!((ControlAwareDefaultInputPort)sink).putControl((UserDefinedControlTuple)cct.getUserObject())) { + if (!((ControlAwareDefaultInputPort)sink).putControl((ControlTuple)cct.getUserObject())) { // Operator will not handle; forward to sinks forwardToSinks(false, cct); } @@ -116,13 +116,13 @@ public class OiONode extends GenericNode case CUSTOM_CONTROL: CustomControlTuple cct = ((CustomControlTuple)t); - UserDefinedControlTuple udct = (UserDefinedControlTuple)cct.getUserObject(); + ControlTuple udct = (ControlTuple)cct.getUserObject(); - if (udct.getDeliveryType().equals(UserDefinedControlTuple.DeliveryType.IMMEDIATE)) { // Immediate Delivery + if (udct.getDeliveryType().equals(ControlTuple.DeliveryType.IMMEDIATE)) { // Immediate Delivery if (!isDuplicate(immediateDeliveryControlTuples, cct)) { Sink sink = ((OiOStream.OiOReservoir)reservoir).getSink(); if (sink instanceof ControlAwareDefaultInputPort) { - if (!((ControlAwareDefaultInputPort)sink).putControl((UserDefinedControlTuple)cct.getUserObject())) { + if (!((ControlAwareDefaultInputPort)sink).putControl((ControlTuple)cct.getUserObject())) { // Operator will not handle; forward to sinks forwardToSinks(false, cct); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java index 77ce1f0..c45650f 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java @@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.apex.api.UserDefinedControlTuple; +import org.apache.apex.api.operator.ControlTuple; import com.datatorrent.bufferserver.packet.MessageType; import com.datatorrent.common.util.ScheduledExecutorService; @@ -230,7 +230,7 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable } @Override - public boolean putControl(UserDefinedControlTuple payload) + public boolean putControl(ControlTuple payload) { throw new UnsupportedOperationException("Not supported yet."); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java index fa2d823..de6aced 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.apex.api.UserDefinedControlTuple; +import org.apache.apex.api.operator.ControlTuple; import com.datatorrent.api.StreamCodec; import com.datatorrent.bufferserver.client.Publisher; @@ -171,7 +171,7 @@ public class BufferServerPublisher extends Publisher implements ByteCounterStrea } @Override - public boolean putControl(UserDefinedControlTuple payload) + public boolean putControl(ControlTuple payload) { put(new CustomControlTuple(payload)); return false; http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java index 606add0..915553e 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.apex.api.UserDefinedControlTuple; +import org.apache.apex.api.operator.ControlTuple; import com.datatorrent.api.Sink; import com.datatorrent.api.StreamCodec; @@ -199,7 +199,7 @@ public class BufferServerSubscriber extends Subscriber implements ByteCounterStr } @Override - public boolean putControl(UserDefinedControlTuple payload) + public boolean putControl(ControlTuple payload) { throw new UnsupportedOperationException("Not supported yet."); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java index 574e61c..69fbb54 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java @@ -28,7 +28,7 @@ import java.nio.channels.SocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.apex.api.UserDefinedControlTuple; +import org.apache.apex.api.operator.ControlTuple; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.KryoException; @@ -486,7 +486,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream } @Override - public boolean putControl(UserDefinedControlTuple payload) + public boolean putControl(ControlTuple payload) { put(new CustomControlTuple(payload)); return true; http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java index 7559a18..9e49078 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java @@ -21,7 +21,7 @@ package com.datatorrent.stram.stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.apex.api.UserDefinedControlTuple; +import org.apache.apex.api.operator.ControlTuple; import com.datatorrent.stram.engine.AbstractReservoir; import com.datatorrent.stram.engine.Stream; @@ -102,7 +102,7 @@ public class InlineStream implements Stream } @Override - public boolean putControl(UserDefinedControlTuple payload) + public boolean putControl(ControlTuple payload) { put(new CustomControlTuple(payload)); return false; http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java b/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java index 007a3ac..7809c98 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java @@ -24,7 +24,7 @@ import java.util.HashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.apex.api.UserDefinedControlTuple; +import org.apache.apex.api.operator.ControlTuple; import com.datatorrent.api.Sink; import com.datatorrent.stram.engine.Stream; @@ -123,7 +123,7 @@ public class MuxStream implements Stream.MultiSinkCapableStream } @Override - public boolean putControl(UserDefinedControlTuple payload) + public boolean putControl(ControlTuple payload) { put(new CustomControlTuple(payload)); return false; http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java b/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java index f4a2b9b..409ff6c 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java @@ -18,7 +18,7 @@ */ package com.datatorrent.stram.stream; -import org.apache.apex.api.UserDefinedControlTuple; +import org.apache.apex.api.operator.ControlTuple; import com.datatorrent.api.Sink; import com.datatorrent.stram.engine.Stream; @@ -80,7 +80,7 @@ public class OiOStream implements Stream } @Override - public boolean putControl(UserDefinedControlTuple payload) + public boolean putControl(ControlTuple payload) { put(new CustomControlTuple(payload)); return false; http://git-wip-us.apache.org/repos/asf/apex-core/blob/d6f17f23/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java b/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java index 86078c2..6ca530b 100644 --- a/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java +++ b/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java @@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.apex.api.ControlAwareDefaultInputPort; import org.apache.apex.api.ControlAwareDefaultOutputPort; -import org.apache.apex.api.UserDefinedControlTuple; +import org.apache.apex.api.operator.ControlTuple; import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.Context; @@ -122,7 +122,7 @@ public class CustomControlTupleTest } @Override - public boolean processControl(UserDefinedControlTuple tuple) + public boolean processControl(ControlTuple tuple) { output.emitControl(tuple); return true; @@ -145,7 +145,7 @@ public class CustomControlTupleTest public final transient ControlAwareDefaultInputPort<Double> input = new ControlAwareDefaultInputPort<Double>() { @Override - public boolean processControl(UserDefinedControlTuple payload) + public boolean processControl(ControlTuple payload) { numControlTuples++; return false; @@ -332,7 +332,7 @@ public class CustomControlTupleTest testApp(new Application5()); } - public static class TestControlTuple implements UserDefinedControlTuple + public static class TestControlTuple implements ControlTuple { public long data; public boolean immediate;
