Repository: apex-core Updated Branches: refs/heads/master 3b660c9c1 -> 1e4785671
APEXCORE-580 APEXCORE-581 Support for custom control tuples Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/1e478567 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/1e478567 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/1e478567 Branch: refs/heads/master Commit: 1e47856712dc4fcae40856d27ce8ce2360037a12 Parents: 3b660c9 Author: bhupeshchawda <[email protected]> Authored: Wed Dec 28 15:48:42 2016 +0530 Committer: bhupeshchawda <[email protected]> Committed: Fri Mar 3 11:08:25 2017 +0530 ---------------------------------------------------------------------- .../api/ControlTupleEnabledSink.java | 56 +++ .../com/datatorrent/api/DefaultInputPort.java | 2 +- .../com/datatorrent/api/DefaultOutputPort.java | 21 +- .../apex/api/ControlAwareDefaultInputPort.java | 46 +++ .../apex/api/ControlAwareDefaultOutputPort.java | 60 +++ .../apex/api/UserDefinedControlTuple.java | 46 +++ .../bufferserver/packet/CustomControlTuple.java | 36 ++ .../bufferserver/packet/MessageType.java | 4 + .../datatorrent/bufferserver/packet/Tuple.java | 3 + .../datatorrent/stram/engine/GenericNode.java | 126 ++++++- .../com/datatorrent/stram/engine/OiONode.java | 53 +++ .../com/datatorrent/stram/engine/Stream.java | 3 +- .../datatorrent/stram/engine/UnifierNode.java | 1 - .../stram/engine/WindowGenerator.java | 8 + .../stram/stream/BufferServerPublisher.java | 32 ++ .../stram/stream/BufferServerSubscriber.java | 12 + .../datatorrent/stram/stream/FastPublisher.java | 15 + .../datatorrent/stram/stream/InlineStream.java | 10 + .../com/datatorrent/stram/stream/MuxStream.java | 11 +- .../com/datatorrent/stram/stream/OiOStream.java | 15 + .../stram/tuple/CustomControlTuple.java | 62 +++ .../stram/CustomControlTupleTest.java | 376 +++++++++++++++++++ .../stram/engine/GenericNodeTest.java | 192 ++++++++++ 23 files changed, 1179 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java b/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java new file mode 100644 index 0000000..e27003d --- /dev/null +++ b/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.api; + +import org.apache.apex.api.UserDefinedControlTuple; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * A {@link Sink} which supports adding control tuples + */ [email protected] +public interface ControlTupleEnabledSink<T> extends Sink<T> +{ + public static final ControlTupleEnabledSink<Object> BLACKHOLE = new ControlTupleEnabledSink<Object>() + { + @Override + public void put(Object tuple) + { + } + + @Override + public boolean putControl(UserDefinedControlTuple payload) + { + return true; + } + + @Override + public int getCount(boolean reset) + { + return 0; + } + }; + + /** + * Add a control tuple to the sink + * + * @param payload the control tuple payload + */ + public boolean putControl(UserDefinedControlTuple payload); +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/com/datatorrent/api/DefaultInputPort.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/DefaultInputPort.java b/api/src/main/java/com/datatorrent/api/DefaultInputPort.java index 046a35d..dc8705c 100644 --- a/api/src/main/java/com/datatorrent/api/DefaultInputPort.java +++ b/api/src/main/java/com/datatorrent/api/DefaultInputPort.java @@ -31,7 +31,7 @@ import com.datatorrent.api.Operator.InputPort; */ public abstract class DefaultInputPort<T> implements InputPort<T>, Sink<T> { - private int count; + protected int count; protected boolean connected = false; /** http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java b/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java index 71be22c..acd562f 100644 --- a/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java +++ b/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java @@ -37,7 +37,7 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T> public static final String THREAD_AFFINITY_DISABLE_CHECK = "com.datatorrent.api.DefaultOutputPort.thread.check.disable"; private static final Logger logger = LoggerFactory.getLogger(DefaultOutputPort.class); - private transient Sink<Object> sink; + protected transient Sink<Object> sink; private transient Thread operatorThread; /** @@ -45,7 +45,7 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T> */ public DefaultOutputPort() { - this.sink = Sink.BLACKHOLE; + this.sink = ControlTupleEnabledSink.BLACKHOLE; } /** @@ -55,13 +55,18 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T> */ public void emit(T tuple) { + verifyOperatorThread(); + sink.put(tuple); + } + + protected void verifyOperatorThread() + { // operatorThread could be null if setup() never got called. if (operatorThread != null && Thread.currentThread() != operatorThread) { // only under certain modes: enforce this throw new IllegalStateException("Current thread " + Thread.currentThread().getName() + - " is different from the operator thread " + operatorThread.getName()); + " is different from the operator thread " + operatorThread.getName()); } - sink.put(tuple); } /** @@ -70,7 +75,7 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T> * Called by execution engine to inject sink at deployment time. */ @Override - public final void setSink(Sink<Object> s) + public void setSink(Sink<Object> s) { this.sink = s == null ? Sink.BLACKHOLE : s; } @@ -83,7 +88,7 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T> */ public boolean isConnected() { - return sink != Sink.BLACKHOLE; + return sink != ControlTupleEnabledSink.BLACKHOLE; } /** @@ -113,4 +118,8 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T> { } + protected Sink<Object> getSink() + { + return sink; + } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java new file mode 100644 index 0000000..ff2b849 --- /dev/null +++ b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.api; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.ControlTupleEnabledSink; +import com.datatorrent.api.DefaultInputPort; + +/** + * Default abstract implementation for an input port which is capable of processing + * @{@link UserDefinedControlTuple} + */ [email protected] +public abstract class ControlAwareDefaultInputPort<T> extends DefaultInputPort<T> implements ControlTupleEnabledSink<T> +{ + @Override + public boolean putControl(UserDefinedControlTuple payload) + { + count++; + return processControl(payload); + } + + /** + * Process the control tuples + * + * @param payload the control tuple payload generated by upstream operator(s) + */ + public abstract boolean processControl(UserDefinedControlTuple payload); +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java new file mode 100644 index 0000000..4a83518 --- /dev/null +++ b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.api; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.ControlTupleEnabledSink; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Sink; + +/** + * Default implementation for an output port which can emit a @{@link UserDefinedControlTuple}. + * The {@link #emitControl(UserDefinedControlTuple)} method can be used to emit control tuples onto this output port + */ [email protected] +public class ControlAwareDefaultOutputPort<T> extends DefaultOutputPort<T> +{ + public ControlAwareDefaultOutputPort() + { + sink = ControlTupleEnabledSink.BLACKHOLE; + } + + /** + * Allows the operator to emit a @{@link UserDefinedControlTuple} + * @param {@link UserDefinedControlTuple} + */ + public void emitControl(UserDefinedControlTuple tuple) + { + verifyOperatorThread(); + ((ControlTupleEnabledSink)sink).putControl(tuple); + } + + public boolean isConnected() + { + return sink != ControlTupleEnabledSink.BLACKHOLE; + } + + @Override + public void setSink(Sink<Object> s) + { + this.sink = (s == null ? ControlTupleEnabledSink.BLACKHOLE : s); + } + +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java b/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java new file mode 100644 index 0000000..8e62a8f --- /dev/null +++ b/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.api; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Any user generated control tuple must implement {@link UserDefinedControlTuple} interface + */ [email protected] +public interface UserDefinedControlTuple +{ + /** + * A user generated control tuple must specify a @{@link DeliveryType} + * @return @{@link DeliveryType} type + */ + DeliveryType getDeliveryType(); + + /** + * All custom control tuples can be delivered according to the following semantics + * 1. IMMEDIATE - The control tuple will be delivered immediately to the next operator + * 2. END_WINDOW - The control tuple will be delivered to the next operator just before the + * com.datatorrent.api.Operator#endWindow() call. + */ + enum DeliveryType + { + IMMEDIATE, + END_WINDOW + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/CustomControlTuple.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/CustomControlTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/CustomControlTuple.java new file mode 100644 index 0000000..3aca31d --- /dev/null +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/CustomControlTuple.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.bufferserver.packet; + +/** + * Custom Control Tuple class + */ +public class CustomControlTuple extends DataTuple +{ + public CustomControlTuple(byte[] array, int offset, int index) + { + super(array, offset, index); + } + + @Override + public MessageType getType() + { + return MessageType.CUSTOM_CONTROL; + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java index 3c0ec2c..efc4ac3 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java @@ -37,6 +37,7 @@ public enum MessageType RESET_REQUEST(9), CHECKPOINT(10), CODEC_STATE(11), + CUSTOM_CONTROL(12), NO_MESSAGE_ODD(127); public static final byte NO_MESSAGE_VALUE = 0; @@ -51,6 +52,7 @@ public enum MessageType public static final byte RESET_REQUEST_VALUE = 9; public static final byte CHECKPOINT_VALUE = 10; public static final byte CODEC_STATE_VALUE = 11; + public static final byte CUSTOM_CONTROL_VALUE = 12; public static final byte NO_MESSAGE_ODD_VALUE = 127; public final int getNumber() @@ -85,6 +87,8 @@ public enum MessageType return CHECKPOINT; case 11: return CODEC_STATE; + case 12: + return CUSTOM_CONTROL; case 127: return NO_MESSAGE_ODD; default: http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java index de3cae8..aae7f68 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java @@ -73,6 +73,9 @@ public abstract class Tuple case END_WINDOW: return new EndWindowTuple(buffer, offset, length); + case CUSTOM_CONTROL: + return new CustomControlTuple(buffer, offset, length); + case END_STREAM: return new WindowIdTuple(buffer, offset, length); http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java index 41acd43..dae838d 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; @@ -30,10 +31,14 @@ import java.util.Map.Entry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.api.ControlAwareDefaultInputPort; +import org.apache.apex.api.UserDefinedControlTuple; import org.apache.commons.lang.UnhandledException; import com.google.common.base.Throwables; +import com.google.common.collect.Maps; +import com.datatorrent.api.ControlTupleEnabledSink; import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.IdleTimeHandler; import com.datatorrent.api.Operator.InputPort; @@ -47,6 +52,7 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerSt import com.datatorrent.stram.debug.TappedReservoir; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.Operators; +import com.datatorrent.stram.tuple.CustomControlTuple; import com.datatorrent.stram.tuple.ResetWindowTuple; import com.datatorrent.stram.tuple.Tuple; @@ -67,6 +73,7 @@ public class GenericNode extends Node<Operator> { protected final HashMap<String, SweepableReservoir> inputs = new HashMap<>(); protected ArrayList<DeferredInputConnection> deferredInputConnections = new ArrayList<>(); + protected Map<SweepableReservoir,Sink> reservoirPortMap = Maps.newHashMap(); @Override @SuppressWarnings("unchecked") @@ -249,6 +256,9 @@ public class GenericNode extends Node<Operator> TupleTracker tracker; LinkedList<TupleTracker> resetTupleTracker = new LinkedList<>(); + Map<SweepableReservoir, LinkedHashSet<CustomControlTuple>> immediateDeliveryTuples = Maps.newHashMap(); + Map<SweepableReservoir,LinkedHashSet<CustomControlTuple>> endWindowDeliveryTuples = Maps.newHashMap(); + try { do { Iterator<Map.Entry<String, SweepableReservoir>> buffers = activeQueues.iterator(); @@ -290,8 +300,8 @@ public class GenericNode extends Node<Operator> for (int s = sinks.length; s-- > 0; ) { sinks[s].put(resetWindowTuple); } - controlTupleCount++; } + controlTupleCount++; t.setWindowId(windowAhead); } for (int s = sinks.length; s-- > 0; ) { @@ -354,6 +364,36 @@ public class GenericNode extends Node<Operator> if (delay) { t.setWindowId(windowAhead); } + + /* Emit control tuples here */ + if (reservoirPortMap.isEmpty()) { + populateReservoirInputPortMap(); + } + + + for (Entry<SweepableReservoir,LinkedHashSet<CustomControlTuple>> portSet: endWindowDeliveryTuples.entrySet()) { + Sink activeSink = reservoirPortMap.get(portSet.getKey()); + // activeSink may not be null + if (activeSink instanceof ControlAwareDefaultInputPort) { + ControlTupleEnabledSink sink = (ControlTupleEnabledSink)activeSink; + for (CustomControlTuple cct : portSet.getValue()) { + if (!sink.putControl((UserDefinedControlTuple)cct.getUserObject())) { + // operator cannot handle control tuple; forward to sinks + forwardToSinks(delay, cct); + } + } + } else { + // Not a ControlAwarePort. Operator cannot handle a custom control tuple. + for (CustomControlTuple cct : portSet.getValue()) { + forwardToSinks(delay, cct); + } + } + } + + immediateDeliveryTuples.clear(); + endWindowDeliveryTuples.clear(); + + /* Now call endWindow() */ processEndWindow(t); activeQueues.addAll(inputs.entrySet()); expectingBeginWindow = activeQueues.size(); @@ -362,6 +402,53 @@ public class GenericNode extends Node<Operator> } break; + case CUSTOM_CONTROL: + activePort.remove(); + /* All custom control tuples are expected to be arriving in the current window only.*/ + /* Buffer control tuples until end of the window */ + CustomControlTuple cct = (CustomControlTuple)t; + UserDefinedControlTuple udct = (UserDefinedControlTuple)cct.getUserObject(); + boolean forward = false; + + // Handle Immediate Delivery Control Tuples + if (udct.getDeliveryType().equals(UserDefinedControlTuple.DeliveryType.IMMEDIATE)) { + if (!isDuplicate(immediateDeliveryTuples.get(activePort), cct)) { + // Forward immediately + if (reservoirPortMap.isEmpty()) { + populateReservoirInputPortMap(); + } + + Sink activeSink = reservoirPortMap.get(activePort); + // activeSink may not be null + if (activeSink instanceof ControlAwareDefaultInputPort) { + ControlTupleEnabledSink sink = (ControlTupleEnabledSink)activeSink; + if (!sink.putControl((UserDefinedControlTuple)cct.getUserObject())) { + forward = true; + } + } else { + forward = true; + } + + if (forward) { + forwardToSinks(delay, cct); + } + // Add to set + if (!immediateDeliveryTuples.containsKey(activePort)) { + immediateDeliveryTuples.put(activePort, new LinkedHashSet<CustomControlTuple>()); + } + immediateDeliveryTuples.get(activePort).add(cct); + } + } else { + // Buffer EndWindow Delivery Control Tuples + if (!endWindowDeliveryTuples.containsKey(activePort)) { + endWindowDeliveryTuples.put(activePort, new LinkedHashSet<CustomControlTuple>()); + } + if (!isDuplicate(endWindowDeliveryTuples.get(activePort), cct)) { + endWindowDeliveryTuples.get(activePort).add(cct); + } + } + break; + case CHECKPOINT: activePort.remove(); long checkpointWindow = t.getWindowId(); @@ -656,6 +743,43 @@ public class GenericNode extends Node<Operator> } + protected void forwardToSinks(boolean delay, Object o) + { + if (!delay) { + for (int s = sinks.length; s-- > 0; ) { + sinks[s].put(o); + } + controlTupleCount++; + } + } + + /** + * Populate {@link #reservoirPortMap} with information on which reservoirs are connected to which input ports + */ + protected void populateReservoirInputPortMap() + { + for (Entry<String,Operators.PortContextPair<InputPort<?>>> entry : descriptor.inputPorts.entrySet()) { + if (entry.getValue().component != null && entry.getValue().component instanceof InputPort) { + if (inputs.containsKey(entry.getKey())) { + reservoirPortMap.put(inputs.get(entry.getKey()), entry.getValue().component.getSink()); + } + } + } + } + + protected boolean isDuplicate(LinkedHashSet<CustomControlTuple> set, CustomControlTuple t) + { + if (set == null || set.isEmpty()) { + return false; + } + for (CustomControlTuple cct : set) { + if (cct.getUid().equals(t.getUid())) { + return true; + } + } + return false; + } + private void fabricateFirstWindow(Operator.DelayOperator delayOperator, long windowAhead) { Tuple beginWindowTuple = new Tuple(MessageType.BEGIN_WINDOW, windowAhead); http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java b/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java index f968b4e..e2370ff 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java @@ -19,16 +19,24 @@ package com.datatorrent.stram.engine; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.Map.Entry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.apex.api.ControlAwareDefaultInputPort; +import org.apache.apex.api.UserDefinedControlTuple; import org.apache.commons.lang.UnhandledException; +import com.google.common.collect.Sets; + import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.InputPort; import com.datatorrent.api.Sink; import com.datatorrent.stram.plan.logical.Operators.PortContextPair; +import com.datatorrent.stram.stream.OiOStream; +import com.datatorrent.stram.tuple.CustomControlTuple; import com.datatorrent.stram.tuple.Tuple; /** @@ -59,6 +67,9 @@ public class OiONode extends GenericNode reservoir = sr; } + private LinkedHashSet<CustomControlTuple> immediateDeliveryControlTuples = Sets.newLinkedHashSet(); + private LinkedHashSet<CustomControlTuple> endWindowControlTuples = Sets.newLinkedHashSet(); + @Override public void put(Tuple t) { @@ -82,10 +93,52 @@ public class OiONode extends GenericNode case END_WINDOW: endWindowDequeueTimes.put(reservoir, System.currentTimeMillis()); if (--expectingEndWindows == 0) { + + /* process custom control tuples here */ + for (CustomControlTuple cct : endWindowControlTuples) { + Sink sink = ((OiOStream.OiOReservoir)reservoir).getSink(); + if (sink instanceof ControlAwareDefaultInputPort) { + if (!((ControlAwareDefaultInputPort)sink).putControl((UserDefinedControlTuple)cct.getUserObject())) { + // Operator will not handle; forward to sinks + forwardToSinks(false, cct); + } + } else { + // Port incapable of handling; forward to sinks + forwardToSinks(false, cct); + } + } + endWindowControlTuples.clear(); + immediateDeliveryControlTuples.clear(); + processEndWindow(t); } break; + case CUSTOM_CONTROL: + CustomControlTuple cct = ((CustomControlTuple)t); + UserDefinedControlTuple udct = (UserDefinedControlTuple)cct.getUserObject(); + + if (udct.getDeliveryType().equals(UserDefinedControlTuple.DeliveryType.IMMEDIATE)) { // Immediate Delivery + if (!isDuplicate(immediateDeliveryControlTuples, cct)) { + Sink sink = ((OiOStream.OiOReservoir)reservoir).getSink(); + if (sink instanceof ControlAwareDefaultInputPort) { + if (!((ControlAwareDefaultInputPort)sink).putControl((UserDefinedControlTuple)cct.getUserObject())) { + // Operator will not handle; forward to sinks + forwardToSinks(false, cct); + } + } else { + forwardToSinks(false, cct); + } + // store + immediateDeliveryControlTuples.add(cct); + } + } else { // End Window Delivery + if (!isDuplicate(endWindowControlTuples, cct)) { + endWindowControlTuples.add(cct); + } + } + break; + case CHECKPOINT: dagCheckpointOffsetCount = 0; if (lastCheckpointWindowId < t.getWindowId() && !doCheckpoint) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/engine/Stream.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Stream.java b/engine/src/main/java/com/datatorrent/stram/engine/Stream.java index fc93b38..196134f 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/Stream.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/Stream.java @@ -19,6 +19,7 @@ package com.datatorrent.stram.engine; import com.datatorrent.api.Component; +import com.datatorrent.api.ControlTupleEnabledSink; import com.datatorrent.api.Operator.ActivationListener; import com.datatorrent.api.Sink; @@ -32,7 +33,7 @@ import com.datatorrent.api.Sink; /* * Provides basic interface for a stream object. Stram, StramChild work via this interface */ -public interface Stream extends Component<StreamContext>, ActivationListener<StreamContext>, Sink<Object> +public interface Stream extends Component<StreamContext>, ActivationListener<StreamContext>, ControlTupleEnabledSink<Object> { public interface MultiSinkCapableStream extends Stream { http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/engine/UnifierNode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/UnifierNode.java b/engine/src/main/java/com/datatorrent/stram/engine/UnifierNode.java index e38c94e..57a20b7 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/UnifierNode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/UnifierNode.java @@ -27,7 +27,6 @@ import com.datatorrent.api.Operator.InputPort; import com.datatorrent.api.Operator.Unifier; import com.datatorrent.api.Sink; import com.datatorrent.api.StreamCodec; - import com.datatorrent.stram.plan.logical.Operators.PortContextPair; /** http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java index 3a8438d..77ce1f0 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java @@ -25,6 +25,8 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.api.UserDefinedControlTuple; + import com.datatorrent.bufferserver.packet.MessageType; import com.datatorrent.common.util.ScheduledExecutorService; import com.datatorrent.netlet.util.CircularBuffer; @@ -228,6 +230,12 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable } @Override + public boolean putControl(UserDefinedControlTuple payload) + { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override protected Queue getQueue() { return queue; http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java index 7db4892..fa2d823 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java @@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.api.UserDefinedControlTuple; + import com.datatorrent.api.StreamCodec; import com.datatorrent.bufferserver.client.Publisher; import com.datatorrent.bufferserver.packet.BeginWindowTuple; @@ -40,6 +42,7 @@ import com.datatorrent.stram.codec.StatefulStreamCodec; import com.datatorrent.stram.codec.StatefulStreamCodec.DataStatePair; import com.datatorrent.stram.engine.ByteCounterStream; import com.datatorrent.stram.engine.StreamContext; +import com.datatorrent.stram.tuple.CustomControlTuple; import com.datatorrent.stram.tuple.Tuple; import static java.lang.Thread.sleep; @@ -98,6 +101,28 @@ public class BufferServerPublisher extends Publisher implements ByteCounterStrea array = EndWindowTuple.getSerializedTuple((int)t.getWindowId()); break; + case CUSTOM_CONTROL: + if (statefulSerde == null) { + array = com.datatorrent.bufferserver.packet.CustomControlTuple + .getSerializedTuple(MessageType.CUSTOM_CONTROL_VALUE, serde.toByteArray(payload)); + } else { + DataStatePair dsp = statefulSerde.toDataStatePair(payload); + if (dsp.state != null) { + array = com.datatorrent.bufferserver.packet.CustomControlTuple + .getSerializedTuple(MessageType.CODEC_STATE_VALUE, dsp.state); + try { + while (!write(array)) { + sleep(5); + } + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + } + array = com.datatorrent.bufferserver.packet.CustomControlTuple + .getSerializedTuple(MessageType.CUSTOM_CONTROL_VALUE, dsp.data); + } + break; + case END_STREAM: array = EndStreamTuple.getSerializedTuple((int)t.getWindowId()); break; @@ -145,6 +170,13 @@ public class BufferServerPublisher extends Publisher implements ByteCounterStrea } } + @Override + public boolean putControl(UserDefinedControlTuple payload) + { + put(new CustomControlTuple(payload)); + return false; + } + /** * * @param context http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java index d5b0997..606add0 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java @@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.api.UserDefinedControlTuple; + import com.datatorrent.api.Sink; import com.datatorrent.api.StreamCodec; import com.datatorrent.bufferserver.client.Subscriber; @@ -196,6 +198,12 @@ public class BufferServerSubscriber extends Subscriber implements ByteCounterStr throw new UnsupportedOperationException("Not supported yet."); } + @Override + public boolean putControl(UserDefinedControlTuple payload) + { + throw new UnsupportedOperationException("Not supported yet."); + } + public SweepableReservoir releaseReservoir(String sinkId) { BufferReservoir r = reservoirMap.remove(sinkId); @@ -343,6 +351,10 @@ public class BufferServerSubscriber extends Subscriber implements ByteCounterStr o = new EndWindowTuple(baseSeconds | (lastWindowId = data.getWindowId())); break; + case CUSTOM_CONTROL: + o = processPayload(data); + break; + case END_STREAM: o = new EndStreamTuple(baseSeconds | data.getWindowId()); break; http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java index 88f2052..574e61c 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java @@ -28,6 +28,8 @@ import java.nio.channels.SocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.api.UserDefinedControlTuple; + import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.io.Output; @@ -44,6 +46,7 @@ import com.datatorrent.netlet.Listener; import com.datatorrent.netlet.Listener.ClientListener; import com.datatorrent.stram.engine.Stream; import com.datatorrent.stram.engine.StreamContext; +import com.datatorrent.stram.tuple.CustomControlTuple; import com.datatorrent.stram.tuple.Tuple; import static java.lang.Thread.sleep; @@ -228,6 +231,11 @@ public class FastPublisher extends Kryo implements ClientListener, Stream array = EndWindowTuple.getSerializedTuple((int)t.getWindowId()); break; + case CUSTOM_CONTROL: + array = null; + // TODO implement + break; + case END_STREAM: array = EndStreamTuple.getSerializedTuple((int)t.getWindowId()); break; @@ -477,6 +485,13 @@ public class FastPublisher extends Kryo implements ClientListener, Stream } } + @Override + public boolean putControl(UserDefinedControlTuple payload) + { + put(new CustomControlTuple(payload)); + return true; + } + @SuppressWarnings("SleepWhileInLoop") public void advanceWriteBuffer() { http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java index ec9660b..7559a18 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java @@ -21,10 +21,13 @@ package com.datatorrent.stram.stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.api.UserDefinedControlTuple; + import com.datatorrent.stram.engine.AbstractReservoir; import com.datatorrent.stram.engine.Stream; import com.datatorrent.stram.engine.StreamContext; import com.datatorrent.stram.engine.SweepableReservoir; +import com.datatorrent.stram.tuple.CustomControlTuple; import com.datatorrent.stram.tuple.Tuple; /** @@ -99,6 +102,13 @@ public class InlineStream implements Stream } @Override + public boolean putControl(UserDefinedControlTuple payload) + { + put(new CustomControlTuple(payload)); + return false; + } + + @Override public int getCount(boolean reset) { try { http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java b/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java index 63f5ee4..007a3ac 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java @@ -24,10 +24,12 @@ import java.util.HashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.api.Sink; +import org.apache.apex.api.UserDefinedControlTuple; +import com.datatorrent.api.Sink; import com.datatorrent.stram.engine.Stream; import com.datatorrent.stram.engine.StreamContext; +import com.datatorrent.stram.tuple.CustomControlTuple; /** * <p>MuxStream class.</p> @@ -121,6 +123,13 @@ public class MuxStream implements Stream.MultiSinkCapableStream } @Override + public boolean putControl(UserDefinedControlTuple payload) + { + put(new CustomControlTuple(payload)); + return false; + } + + @Override public int getCount(boolean reset) { try { http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java b/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java index 61ed0e6..f4a2b9b 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java @@ -18,10 +18,13 @@ */ package com.datatorrent.stram.stream; +import org.apache.apex.api.UserDefinedControlTuple; + import com.datatorrent.api.Sink; import com.datatorrent.stram.engine.Stream; import com.datatorrent.stram.engine.StreamContext; import com.datatorrent.stram.engine.SweepableReservoir; +import com.datatorrent.stram.tuple.CustomControlTuple; import com.datatorrent.stram.tuple.Tuple; /** @@ -77,6 +80,13 @@ public class OiOStream implements Stream } @Override + public boolean putControl(UserDefinedControlTuple payload) + { + put(new CustomControlTuple(payload)); + return false; + } + + @Override public int getCount(boolean reset) { try { @@ -124,6 +134,11 @@ public class OiOStream implements Stream } } + public Sink<Object> getSink() + { + return OiOStream.this.sink; + } + @Override public Tuple sweep() { http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/tuple/CustomControlTuple.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/tuple/CustomControlTuple.java b/engine/src/main/java/com/datatorrent/stram/tuple/CustomControlTuple.java new file mode 100644 index 0000000..810fa57 --- /dev/null +++ b/engine/src/main/java/com/datatorrent/stram/tuple/CustomControlTuple.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.tuple; + +import java.util.UUID; + +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.esotericsoftware.kryo.serializers.JavaSerializer; + +import com.datatorrent.bufferserver.packet.MessageType; + +/** + * An implementation for @{@link Tuple} which can be generated by the user + * Acts as the wrapper for the user payload + */ +public class CustomControlTuple extends Tuple +{ + private final Object userObject; + @FieldSerializer.Bind(JavaSerializer.class) + private final UUID uid; + + protected CustomControlTuple() + { + // for Kryo + super(MessageType.CUSTOM_CONTROL, 0); + userObject = null; + uid = null; + } + + public CustomControlTuple(Object userObject) + { + super(MessageType.CUSTOM_CONTROL, 0); + this.userObject = userObject; + uid = UUID.randomUUID(); + } + + public Object getUserObject() + { + return userObject; + } + + public UUID getUid() + { + return uid; + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java b/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java new file mode 100644 index 0000000..86078c2 --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java @@ -0,0 +1,376 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram; + +import java.util.concurrent.Callable; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.api.ControlAwareDefaultInputPort; +import org.apache.apex.api.ControlAwareDefaultOutputPort; +import org.apache.apex.api.UserDefinedControlTuple; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.common.util.BaseOperator; + +public class CustomControlTupleTest +{ + public static final Logger LOG = LoggerFactory.getLogger(CustomControlTupleTest.class); + private static long controlIndex = 0; + private static int numControlTuples = 0; + private static boolean done = false; + private static boolean endApp = false; + private static long endingWindowId = 0; + private static boolean immediate = false; + + @Before + public void starting() + { + controlIndex = 0; + numControlTuples = 0; + done = false; + endApp = false; + endingWindowId = 0; + } + + public static class Generator extends BaseOperator implements InputOperator + { + private long currentWindowId; + public final transient ControlAwareDefaultOutputPort<Double> out = new ControlAwareDefaultOutputPort<>(); + + @Override + public void beginWindow(long windowId) + { + if (!done) { + currentWindowId = windowId; + out.emitControl(new TestControlTuple(controlIndex++, immediate)); + } + } + + @Override + public void emitTuples() + { + if (!done) { + out.emitControl(new TestControlTuple(controlIndex++, immediate)); + } + } + + @Override + public void endWindow() + { + if (!done) { + out.emitControl(new TestControlTuple(controlIndex++, immediate)); + endingWindowId = currentWindowId; + done = true; + } + } + } + + public static class DefaultProcessor extends BaseOperator + { + public final transient DefaultInputPort<Double> input = new DefaultInputPort<Double>() + { + @Override + public void process(Double tuple) + { + output.emit(tuple); + } + }; + + public final transient DefaultOutputPort<Double> output = new DefaultOutputPort<>(); + } + + public static class ControlAwareProcessor extends BaseOperator + { + public final transient ControlAwareDefaultInputPort<Double> input = new ControlAwareDefaultInputPort<Double>() + { + @Override + public void process(Double tuple) + { + output.emit(tuple); + } + + @Override + public boolean processControl(UserDefinedControlTuple tuple) + { + output.emitControl(tuple); + return true; + } + }; + + public final transient ControlAwareDefaultOutputPort<Double> output = new ControlAwareDefaultOutputPort<>(); + } + + public static class ControlAwareReceiver extends BaseOperator + { + private long currentWindowId; + + @Override + public void beginWindow(long windowId) + { + currentWindowId = windowId; + } + + public final transient ControlAwareDefaultInputPort<Double> input = new ControlAwareDefaultInputPort<Double>() + { + @Override + public boolean processControl(UserDefinedControlTuple payload) + { + numControlTuples++; + return false; + } + + @Override + public void process(Double tuple) + { + } + }; + + @Override + public void endWindow() + { + if (done && currentWindowId > endingWindowId) { + endApp = true; + } + } + } + + @ApplicationAnnotation(name = "TestDefaultPropagation") + public static class Application1 implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class); + DefaultProcessor processor = dag.addOperator("process", DefaultProcessor.class); + ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class); + dag.addStream("genToProcessor", randomGenerator.out, processor.input); + dag.addStream("ProcessorToReceiver", processor.output, receiver.input); + } + } + + @ApplicationAnnotation(name = "TestExplicitPropagation") + public static class Application2 implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class); + ControlAwareProcessor processor = dag.addOperator("process", ControlAwareProcessor.class); + ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class); + dag.addStream("genToProcessor", randomGenerator.out, processor.input); + dag.addStream("ProcessorToReceiver", processor.output, receiver.input); + } + } + + @ApplicationAnnotation(name = "TestDuplicateControlTuples") + public static class Application3 implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class); + DefaultProcessor processor = dag.addOperator("process", DefaultProcessor.class); + ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class); + dag.addStream("genToProcessor", randomGenerator.out, processor.input); + dag.addStream("ProcessorToReceiver", processor.output, receiver.input); + dag.setOperatorAttribute(processor, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<>(2)); + } + } + + @ApplicationAnnotation(name = "TestThreadLocal") + public static class Application4 implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class); + DefaultProcessor processor = dag.addOperator("process", DefaultProcessor.class); + ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class); + dag.addStream("genToProcessor", randomGenerator.out, processor.input).setLocality(DAG.Locality.THREAD_LOCAL); + dag.addStream("ProcessorToReceiver", processor.output, receiver.input).setLocality(DAG.Locality.THREAD_LOCAL); + } + } + + @ApplicationAnnotation(name = "TestContainerLocal") + public static class Application5 implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class); + DefaultProcessor processor = dag.addOperator("process", DefaultProcessor.class); + ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class); + dag.addStream("genToProcessor", randomGenerator.out, processor.input).setLocality(DAG.Locality.CONTAINER_LOCAL); + dag.addStream("ProcessorToReceiver", processor.output, receiver.input).setLocality(DAG.Locality.CONTAINER_LOCAL); + } + } + + public void testApp(StreamingApplication app) throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return endApp; + } + }); + + lc.run(200000); // runs for 20 seconds and quits if terminating condition not reached + + LOG.info("Control Tuples received {} expected {}", numControlTuples, controlIndex); + Assert.assertTrue("Incorrect Control Tuples", numControlTuples == controlIndex); + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + + @Test + public void testDefaultPropagation() throws Exception + { + immediate = false; + testApp(new Application1()); + } + + @Test + public void testExplicitPropagation() throws Exception + { + immediate = false; + testApp(new Application2()); + } + + @Test + public void testDuplicateControlTuples() throws Exception + { + immediate = false; + testApp(new Application3()); + } + + @Test + public void testThreadLocal() throws Exception + { + immediate = false; + testApp(new Application4()); + } + + @Test + public void testContainerLocal() throws Exception + { + immediate = false; + testApp(new Application5()); + } + + @Test + public void testDefaultPropagationImmediate() throws Exception + { + immediate = true; + testApp(new Application1()); + } + + @Test + public void testExplicitPropagationImmediate() throws Exception + { + immediate = true; + testApp(new Application2()); + } + + @Test + public void testDuplicateControlTuplesImmediate() throws Exception + { + immediate = true; + testApp(new Application3()); + } + + @Test + public void testThreadLocalImmediate() throws Exception + { + immediate = true; + testApp(new Application4()); + } + + @Test + public void testContainerLocalImmediate() throws Exception + { + immediate = true; + testApp(new Application5()); + } + + public static class TestControlTuple implements UserDefinedControlTuple + { + public long data; + public boolean immediate; + + public TestControlTuple() + { + data = 0; + } + + public TestControlTuple(long data, boolean immediate) + { + this.data = data; + this.immediate = immediate; + } + + @Override + public boolean equals(Object t) + { + if (t instanceof TestControlTuple && ((TestControlTuple)t).data == this.data) { + return true; + } + return false; + } + + @Override + public String toString() + { + return data + ""; + } + + @Override + public DeliveryType getDeliveryType() + { + if (immediate) { + return DeliveryType.IMMEDIATE; + } else { + return DeliveryType.END_WINDOW; + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java index af99e98..99dee8f 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java @@ -62,10 +62,13 @@ import com.datatorrent.common.util.ScheduledExecutorService; import com.datatorrent.common.util.ScheduledThreadPoolExecutor; import com.datatorrent.netlet.DefaultEventLoop; import com.datatorrent.netlet.EventLoop; +import com.datatorrent.stram.CustomControlTupleTest; import com.datatorrent.stram.api.Checkpoint; import com.datatorrent.stram.codec.DefaultStatefulStreamCodec; import com.datatorrent.stram.stream.BufferServerPublisher; import com.datatorrent.stram.stream.BufferServerSubscriber; +import com.datatorrent.stram.stream.OiOStream; +import com.datatorrent.stram.tuple.CustomControlTuple; import com.datatorrent.stram.tuple.EndStreamTuple; import com.datatorrent.stram.tuple.EndWindowTuple; import com.datatorrent.stram.tuple.Tuple; @@ -609,6 +612,195 @@ public class GenericNodeTest } @Test + public void testControlTuplesDeliveryGenericNode() throws InterruptedException + { + long maxSleep = 5000000; + long sleeptime = 25L; + GenericOperator go = new GenericOperator(); + final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator", + new DefaultAttributeMap(), null)); + gn.setId(1); + AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024); + + gn.connectInputPort("ip1", reservoir1); + TestSink testSink = new TestSink(); + gn.connectOutputPort("op", testSink); + gn.firstWindowMillis = 0; + gn.windowWidthMillis = 100; + + final AtomicBoolean ab = new AtomicBoolean(false); + Thread t = new Thread() + { + @Override + public void run() + { + ab.set(true); + gn.activate(); + gn.run(); + gn.deactivate(); + } + }; + t.start(); + + long interval = 0; + do { + Thread.sleep(sleeptime); + interval += sleeptime; + } while ((ab.get() == false) && (interval < maxSleep)); + + int controlTupleCount = gn.controlTupleCount; + Tuple beginWindow = new Tuple(MessageType.BEGIN_WINDOW, 0x1L); + reservoir1.add(beginWindow); + + interval = 0; + do { + Thread.sleep(sleeptime); + interval += sleeptime; + } while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep)); + controlTupleCount = gn.controlTupleCount; + + CustomControlTuple t1 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(1, false)); + CustomControlTuple t2 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(2, true)); + CustomControlTuple t3 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(3, false)); + CustomControlTuple t4 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(4, true)); + reservoir1.add(t1); + reservoir1.add(t2); + reservoir1.add(t3); + reservoir1.add(t4); + + interval = 0; + do { + Thread.sleep(sleeptime); + interval += sleeptime; + } while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep)); + + Assert.assertTrue("Custom control tuples emitted immediately", testSink.getResultCount() == 3); + + controlTupleCount = gn.controlTupleCount; + Tuple endWindow = new Tuple(MessageType.END_WINDOW, 0x1L); + reservoir1.add(endWindow); + + interval = 0; + do { + Thread.sleep(sleeptime); + interval += sleeptime; + } while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep)); + + gn.shutdown(); + t.join(); + + Assert.assertTrue("Total control tuples", testSink.getResultCount() == 6); + + long expected = 0; + for (Object o: testSink.collectedTuples) { + if (o instanceof CustomControlTuple) { + expected++; + } + } + Assert.assertTrue("Number of Custom control tuples", expected == 4); + } + + @Test + public void testControlTuplesDeliveryOiONode() throws InterruptedException + { + GenericOperator go = new GenericOperator(); + final OiONode oioNode = new OiONode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator", + new DefaultAttributeMap(), null)); + oioNode.setId(1); + + OiOStream stream = new OiOStream(); + SweepableReservoir reservoir = stream.getReservoir(); + ((OiOStream.OiOReservoir)reservoir).setControlSink((oioNode).getControlSink(reservoir)); + oioNode.connectInputPort("ip1", reservoir); + Sink controlSink = oioNode.getControlSink(reservoir); + + TestSink testSink = new TestSink(); + oioNode.connectOutputPort("op", testSink); + oioNode.firstWindowMillis = 0; + oioNode.windowWidthMillis = 100; + + oioNode.activate(); + + Tuple beginWindow = new Tuple(MessageType.BEGIN_WINDOW, 0x1L); + controlSink.put(beginWindow); + Assert.assertTrue("Begin window", testSink.getResultCount() == 1); + + CustomControlTuple t1 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(1, false)); + CustomControlTuple t2 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(2, true)); + CustomControlTuple t3 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(3, false)); + CustomControlTuple t4 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(4, true)); + controlSink.put(t1); + controlSink.put(t2); + controlSink.put(t3); + controlSink.put(t4); + Assert.assertTrue("Custom control tuples emitted immediately", testSink.getResultCount() == 3); + + Tuple endWindow = new Tuple(MessageType.END_WINDOW, 0x1L); + controlSink.put(endWindow); + + oioNode.deactivate(); + oioNode.shutdown(); + + Assert.assertTrue("Total control tuples", testSink.getResultCount() == 6); + + long expected = 0; + for (Object o: testSink.collectedTuples) { + if (o instanceof CustomControlTuple) { + expected++; + } + } + Assert.assertTrue("Number of Custom control tuples", expected == 4); + } + + @Test + public void testReservoirPortMapping() throws InterruptedException + { + long maxSleep = 5000; + long sleeptime = 25L; + GenericOperator go = new GenericOperator(); + final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator", + new DefaultAttributeMap(), null)); + gn.setId(1); + AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024); + AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024); + + gn.connectInputPort("ip1", reservoir1); + gn.connectInputPort("ip2", reservoir2); + gn.connectOutputPort("op", Sink.BLACKHOLE); + gn.firstWindowMillis = 0; + gn.windowWidthMillis = 100; + + final AtomicBoolean ab = new AtomicBoolean(false); + Thread t = new Thread() + { + @Override + public void run() + { + ab.set(true); + gn.activate(); + gn.run(); + gn.deactivate(); + } + }; + t.start(); + + long interval = 0; + do { + Thread.sleep(sleeptime); + interval += sleeptime; + } while ((ab.get() == false) && (interval < maxSleep)); + + gn.populateReservoirInputPortMap(); + + gn.shutdown(); + t.join(); + + Assert.assertTrue("Port Mapping Size", gn.reservoirPortMap.size() == 2); + Assert.assertTrue("Sink 1 is not a port", gn.reservoirPortMap.get(reservoir1) instanceof Operator.InputPort); + Assert.assertTrue("Sink 2 is not a port", gn.reservoirPortMap.get(reservoir2) instanceof Operator.InputPort); + } + + @Test public void testDoubleCheckpointAtleastOnce() throws Exception { NodeTest.testDoubleCheckpointHandling(ProcessingMode.AT_LEAST_ONCE, true, testMeta.getDir());
