Repository: incubator-apex-core Updated Branches: refs/heads/devel-3 d0908e4bc -> b3402be5a
APEXCORE-60 Iteration support in Apex Core Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f7e1ccf1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f7e1ccf1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f7e1ccf1 Branch: refs/heads/devel-3 Commit: f7e1ccf14154eca92b24c5f6b5387fe56c516829 Parents: d0908e4 Author: David Yan <[email protected]> Authored: Wed Dec 9 15:52:26 2015 -0800 Committer: David Yan <[email protected]> Committed: Fri Jan 22 19:03:46 2016 -0800 ---------------------------------------------------------------------- .../main/java/com/datatorrent/api/Context.java | 7 + .../main/java/com/datatorrent/api/Operator.java | 19 + .../common/util/DefaultDelayOperator.java | 75 ++++ .../datatorrent/stram/StramLocalCluster.java | 15 + .../stram/StreamingContainerManager.java | 56 ++- .../datatorrent/stram/engine/GenericNode.java | 190 +++++++--- .../java/com/datatorrent/stram/engine/Node.java | 6 +- .../stram/engine/StreamingContainer.java | 2 + .../stram/engine/WindowGenerator.java | 14 +- .../stram/plan/logical/LogicalPlan.java | 53 +++ .../stram/plan/physical/PTOperator.java | 4 +- .../stram/plan/physical/PhysicalPlan.java | 19 +- .../stram/plan/physical/StreamMapping.java | 4 +- .../java/com/datatorrent/stram/tuple/Tuple.java | 5 + .../stram/debug/TupleRecorderTest.java | 208 +++++----- .../stram/engine/GenericNodeTest.java | 18 +- .../stram/engine/GenericTestOperator.java | 3 + .../stram/plan/logical/DelayOperatorTest.java | 377 +++++++++++++++++++ 18 files changed, 888 insertions(+), 187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/api/src/main/java/com/datatorrent/api/Context.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index ceed8a2..58bc552 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -166,6 +166,13 @@ public interface Context */ Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>()); + /** + * Attribute of input port. + * This is a read-only attribute to query whether the input port is connected to a DelayOperator + * This is for iterative processing. + */ + Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false); + @SuppressWarnings("FieldNameHidesFieldInSuperclass") long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/api/src/main/java/com/datatorrent/api/Operator.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Operator.java b/api/src/main/java/com/datatorrent/api/Operator.java index 785c60b..d4a6a90 100644 --- a/api/src/main/java/com/datatorrent/api/Operator.java +++ b/api/src/main/java/com/datatorrent/api/Operator.java @@ -99,6 +99,25 @@ public interface Operator extends Component<OperatorContext> } /** + * DelayOperator is an operator of which the outgoing streaming window id is incremented by *one* by the + * engine, thus allowing loops in the "DAG". The output ports of a DelayOperator, if connected, *must* + * immediately connect to an upstream operator in the data flow path. Note that at least one output port of + * DelayOperator should be connected in order for the DelayOperator to serve its purpose. + * + * This is meant for iterative algorithms in the topology. A larger window increment can be simulated by an + * implementation of this interface. + */ + interface DelayOperator extends Operator + { + /** + * This method gets called at the first window of the execution. + * The implementation is expected to emit tuples for initialization and/or + * recovery. + */ + void firstWindow(); + } + + /** * A operator provides ports as a means to consume and produce data tuples. * Concrete ports implement derived interfaces. */ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java new file mode 100644 index 0000000..ff676d4 --- /dev/null +++ b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.common.util; + +import java.util.ArrayList; +import java.util.List; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; + +/** + * DefaultDelayOperator. This is an implementation of the DelayOperator that has one input port and one output + * port, and does a simple pass-through from the input port to the output port, while recording the tuples in memory + * as checkpoint state. Subclass of this operator can override this behavior by overriding processTuple(T tuple). + * + * Note that the engine automatically does a +1 on the output window ID since it is a DelayOperator. + * + * This DelayOperator provides no data loss during recovery, but it incurs a run-time cost per tuple, and all tuples + * of the checkpoint window will be part of the checkpoint state. + */ +public class DefaultDelayOperator<T> extends BaseOperator implements Operator.DelayOperator +{ + public transient DefaultInputPort<T> input = new DefaultInputPort<T>() + { + @Override + public void process(T tuple) + { + processTuple(tuple); + } + }; + + public transient DefaultOutputPort<T> output = new DefaultOutputPort<T>(); + + protected List<T> lastWindowTuples = new ArrayList<>(); + + protected void processTuple(T tuple) + { + lastWindowTuples.add(tuple); + output.emit(tuple); + } + + @Override + public void beginWindow(long windowId) + { + lastWindowTuples.clear(); + } + + @Override + public void firstWindow() + { + for (T tuple : lastWindowTuples) { + output.emit(tuple); + } + } + +} + + http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java index 29e8e03..cda2a38 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java +++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java @@ -27,6 +27,7 @@ import java.net.SocketAddress; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -80,6 +81,7 @@ public class StramLocalCluster implements Runnable, Controller private boolean appDone = false; private final Map<String, StreamingContainer> injectShutdown = new ConcurrentHashMap<String, StreamingContainer>(); private boolean heartbeatMonitoringEnabled = true; + private Callable<Boolean> exitCondition; public interface MockComponentFactory { @@ -427,6 +429,11 @@ public class StramLocalCluster implements Runnable, Controller this.perContainerBufferServer = perContainerBufferServer; } + public void setExitCondition(Callable<Boolean> exitCondition) + { + this.exitCondition = exitCondition; + } + @Override public void run() { @@ -476,6 +483,14 @@ public class StramLocalCluster implements Runnable, Controller appDone = true; } + try { + if (exitCondition != null && exitCondition.call()) { + appDone = true; + } + } catch (Exception ex) { + break; + } + if (Thread.interrupted()) { break; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 4b79589..6233697 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -1008,7 +1008,7 @@ public class StreamingContainerManager implements PlanContext return operatorStatus.latencyMA.getAvg(); } for (PTOperator.PTInput input : maxOperator.getInputs()) { - if (null != input.source.source) { + if (null != input.source.source && !input.delay) { operators.add(input.source.source); } } @@ -1896,6 +1896,19 @@ public class StreamingContainerManager implements PlanContext } + private void addVisited(PTOperator operator, UpdateCheckpointsContext ctx) + { + ctx.visited.add(operator); + for (PTOperator.PTOutput out : operator.getOutputs()) { + for (PTOperator.PTInput sink : out.sinks) { + PTOperator sinkOperator = sink.target; + if (!ctx.visited.contains(sinkOperator)) { + addVisited(sinkOperator, ctx); + } + } + } + } + /** * Compute checkpoints required for a given operator instance to be recovered. * This is done by looking at checkpoints available for downstream dependencies first, @@ -1913,6 +1926,9 @@ public class StreamingContainerManager implements PlanContext if (operator.getState() == PTOperator.State.ACTIVE && (ctx.currentTms - operator.stats.lastWindowIdChangeTms) > operator.stats.windowProcessingTimeoutMillis) { // if the checkpoint is ahead, then it is not blocked but waiting for activation (state-less recovery, at-most-once) if (ctx.committedWindowId.longValue() >= operator.getRecoveryCheckpoint().windowId) { + LOG.debug("Marking operator {} blocked committed window {}, recovery window {}", operator, + Codec.getStringWindowId(ctx.committedWindowId.longValue()), + Codec.getStringWindowId(operator.getRecoveryCheckpoint().windowId)); ctx.blocked.add(operator); } } @@ -1922,25 +1938,30 @@ public class StreamingContainerManager implements PlanContext long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS)); maxCheckpoint = currentWindowId; } + ctx.visited.add(operator); // DFS downstream operators - for (PTOperator.PTOutput out : operator.getOutputs()) { - for (PTOperator.PTInput sink : out.sinks) { - PTOperator sinkOperator = sink.target; - if (!ctx.visited.contains(sinkOperator)) { - // downstream traversal - updateRecoveryCheckpoints(sinkOperator, ctx); - } - // recovery window id cannot move backwards - // when dynamically adding new operators - if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) { - maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId); - } + if (operator.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) { + addVisited(operator, ctx); + } else { + for (PTOperator.PTOutput out : operator.getOutputs()) { + for (PTOperator.PTInput sink : out.sinks) { + PTOperator sinkOperator = sink.target; + if (!ctx.visited.contains(sinkOperator)) { + // downstream traversal + updateRecoveryCheckpoints(sinkOperator, ctx); + } + // recovery window id cannot move backwards + // when dynamically adding new operators + if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) { + maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId); + } - if (ctx.blocked.contains(sinkOperator)) { - if (sinkOperator.stats.getCurrentWindowId() == operator.stats.getCurrentWindowId()) { - // downstream operator is blocked by this operator - ctx.blocked.remove(sinkOperator); + if (ctx.blocked.contains(sinkOperator)) { + if (sinkOperator.stats.getCurrentWindowId() == operator.stats.getCurrentWindowId()) { + // downstream operator is blocked by this operator + ctx.blocked.remove(sinkOperator); + } } } } @@ -1975,7 +1996,6 @@ public class StreamingContainerManager implements PlanContext LOG.debug("Skipping checkpoint update {} during {}", operator, operator.getState()); } - ctx.visited.add(operator); } public long windowIdToMillis(long windowId) http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java index 93cee49..4777f93 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java @@ -34,11 +34,14 @@ import com.datatorrent.api.Operator.ShutdownException; import com.datatorrent.api.Sink; import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.bufferserver.packet.MessageType; import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.netlet.util.CircularBuffer; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats; import com.datatorrent.stram.debug.TappedReservoir; +import com.datatorrent.stram.plan.logical.Operators; +import com.datatorrent.stram.tuple.ResetWindowTuple; import com.datatorrent.stram.tuple.Tuple; /** @@ -198,6 +201,15 @@ public class GenericNode extends Node<Operator> insideWindow = applicationWindowCount != 0; } + private boolean isInputPortConnectedToDelayOperator(String portName) + { + Operators.PortContextPair<InputPort<?>> pcPair = descriptor.inputPorts.get(portName); + if (pcPair == null || pcPair.context == null) { + return false; + } + return pcPair.context.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR); + } + /** * Originally this method was defined in an attempt to implement the interface Runnable. * @@ -212,30 +224,67 @@ public class GenericNode extends Node<Operator> long spinMillis = context.getValue(OperatorContext.SPIN_MILLIS); final boolean handleIdleTime = operator instanceof IdleTimeHandler; int totalQueues = inputs.size(); + int regularQueues = totalQueues; + // regularQueues is the number of queues that are not connected to a DelayOperator + for (String portName : inputs.keySet()) { + if (isInputPortConnectedToDelayOperator(portName)) { + regularQueues--; + } + } - ArrayList<SweepableReservoir> activeQueues = new ArrayList<SweepableReservoir>(); - activeQueues.addAll(inputs.values()); + ArrayList<Map.Entry<String, SweepableReservoir>> activeQueues = new ArrayList<>(); + activeQueues.addAll(inputs.entrySet()); int expectingBeginWindow = activeQueues.size(); int receivedEndWindow = 0; + long firstWindowId = -1; TupleTracker tracker; LinkedList<TupleTracker> resetTupleTracker = new LinkedList<TupleTracker>(); - try { do { - Iterator<SweepableReservoir> buffers = activeQueues.iterator(); + Iterator<Map.Entry<String, SweepableReservoir>> buffers = activeQueues.iterator(); activequeue: while (buffers.hasNext()) { - SweepableReservoir activePort = buffers.next(); + Map.Entry<String, SweepableReservoir> activePortEntry = buffers.next(); + SweepableReservoir activePort = activePortEntry.getValue(); Tuple t = activePort.sweep(); if (t != null) { + boolean delay = (operator instanceof Operator.DelayOperator); + long windowAhead = 0; + if (delay) { + windowAhead = WindowGenerator.getAheadWindowId(t.getWindowId(), firstWindowMillis, windowWidthMillis, 1); + } switch (t.getType()) { case BEGIN_WINDOW: if (expectingBeginWindow == totalQueues) { + // This is the first begin window tuple among all ports + if (isInputPortConnectedToDelayOperator(activePortEntry.getKey())) { + // We need to wait for the first BEGIN_WINDOW from a port not connected to DelayOperator before + // we can do anything with it, because otherwise if a CHECKPOINT tuple arrives from + // upstream after the BEGIN_WINDOW tuple for the next window from the delay operator, it would end + // up checkpointing in the middle of the window. This code is assuming we have at least one + // input port that is not connected to a DelayOperator, and we might have to change this later. + // In the future, this condition will not be needed if we get rid of the CHECKPOINT tuple. + continue; + } activePort.remove(); expectingBeginWindow--; + receivedEndWindow = 0; currentWindowId = t.getWindowId(); + if (delay) { + if (WindowGenerator.getBaseSecondsFromWindowId(windowAhead) > t.getBaseSeconds()) { + // Buffer server code strips out the base seconds from BEGIN_WINDOW and END_WINDOW tuples for + // serialization optimization. That's why we need a reset window here to tell the buffer + // server we are having a new baseSeconds now. + Tuple resetWindowTuple = new ResetWindowTuple(windowAhead); + for (int s = sinks.length; s-- > 0; ) { + sinks[s].put(resetWindowTuple); + } + controlTupleCount++; + } + t.setWindowId(windowAhead); + } for (int s = sinks.length; s-- > 0; ) { sinks[s].put(t); } @@ -245,7 +294,6 @@ public class GenericNode extends Node<Operator> insideWindow = true; operator.beginWindow(currentWindowId); } - receivedEndWindow = 0; } else if (t.getWindowId() == currentWindowId) { activePort.remove(); @@ -253,17 +301,7 @@ public class GenericNode extends Node<Operator> } else { buffers.remove(); - - /* find the name of the port which got out of sequence tuple */ - String port = null; - for (Entry<String, SweepableReservoir> e : inputs.entrySet()) { - if (e.getValue() == activePort) { - port = e.getKey(); - } - } - - assert (port != null); /* we should always find the port */ - + String port = activePortEntry.getKey(); if (PROCESSING_MODE == ProcessingMode.AT_MOST_ONCE) { if (t.getWindowId() < currentWindowId) { /* @@ -279,21 +317,21 @@ public class GenericNode extends Node<Operator> WindowIdActivatedReservoir wiar = new WindowIdActivatedReservoir(port, activePort, currentWindowId); wiar.setSink(sink); inputs.put(port, wiar); - activeQueues.add(wiar); + activeQueues.add(new AbstractMap.SimpleEntry<String, SweepableReservoir>(port, wiar)); break activequeue; } else { expectingBeginWindow--; if (++receivedEndWindow == totalQueues) { processEndWindow(null); - activeQueues.addAll(inputs.values()); + activeQueues.addAll(inputs.entrySet()); expectingBeginWindow = activeQueues.size(); break activequeue; } } } else { - logger.error("Catastrophic Error: Out of sequence tuple {} on port {} while expecting {}", Codec.getStringWindowId(t.getWindowId()), port, Codec.getStringWindowId(currentWindowId)); + logger.error("Catastrophic Error: Out of sequence {} tuple {} on port {} while expecting {}", t.getType(), Codec.getStringWindowId(t.getWindowId()), port, Codec.getStringWindowId(currentWindowId)); System.exit(2); } } @@ -306,8 +344,11 @@ public class GenericNode extends Node<Operator> endWindowDequeueTimes.put(activePort, System.currentTimeMillis()); if (++receivedEndWindow == totalQueues) { assert (activeQueues.isEmpty()); + if (delay) { + t.setWindowId(windowAhead); + } processEndWindow(t); - activeQueues.addAll(inputs.values()); + activeQueues.addAll(inputs.entrySet()); expectingBeginWindow = activeQueues.size(); break activequeue; } @@ -330,11 +371,12 @@ public class GenericNode extends Node<Operator> doCheckpoint = true; } } - - for (int s = sinks.length; s-- > 0; ) { - sinks[s].put(t); + if (!delay) { + for (int s = sinks.length; s-- > 0; ) { + sinks[s].put(t); + } + controlTupleCount++; } - controlTupleCount++; } break; @@ -343,12 +385,14 @@ public class GenericNode extends Node<Operator> * we will receive tuples which are equal to the number of input streams. */ activePort.remove(); - buffers.remove(); + if (isInputPortConnectedToDelayOperator(activePortEntry.getKey())) { + break; // breaking out of the switch/case + } + buffers.remove(); int baseSeconds = t.getBaseSeconds(); tracker = null; - Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator(); - while (trackerIterator.hasNext()) { + for (Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator(); trackerIterator.hasNext(); ) { tracker = trackerIterator.next(); if (tracker.tuple.getBaseSeconds() == baseSeconds) { break; @@ -356,7 +400,7 @@ public class GenericNode extends Node<Operator> } if (tracker == null) { - tracker = new TupleTracker(t, totalQueues); + tracker = new TupleTracker(t, regularQueues); resetTupleTracker.add(tracker); } int trackerIndex = 0; @@ -364,29 +408,50 @@ public class GenericNode extends Node<Operator> if (tracker.ports[trackerIndex] == null) { tracker.ports[trackerIndex++] = activePort; break; - } - else if (tracker.ports[trackerIndex] == activePort) { + } else if (tracker.ports[trackerIndex] == activePort) { break; } trackerIndex++; } - if (trackerIndex == totalQueues) { - trackerIterator = resetTupleTracker.iterator(); + if (trackerIndex == regularQueues) { + Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator(); while (trackerIterator.hasNext()) { if (trackerIterator.next().tuple.getBaseSeconds() <= baseSeconds) { trackerIterator.remove(); } } - for (int s = sinks.length; s-- > 0; ) { - sinks[s].put(t); + if (!delay) { + for (int s = sinks.length; s-- > 0; ) { + sinks[s].put(t); + } + controlTupleCount++; } - controlTupleCount++; - - assert (activeQueues.isEmpty()); - activeQueues.addAll(inputs.values()); + if (!activeQueues.isEmpty()) { + // make sure they are all queues from DelayOperator + for (Map.Entry<String, SweepableReservoir> entry : activeQueues) { + if (!isInputPortConnectedToDelayOperator(entry.getKey())) { + assert (false); + } + } + activeQueues.clear(); + } + activeQueues.addAll(inputs.entrySet()); expectingBeginWindow = activeQueues.size(); + + if (firstWindowId == -1) { + if (delay) { + for (int s = sinks.length; s-- > 0; ) { + sinks[s].put(t); + } + controlTupleCount++; + // if it's a DelayOperator and this is the first RESET_WINDOW (start) or END_STREAM + // (recovery), fabricate the first window + fabricateFirstWindow((Operator.DelayOperator)operator, windowAhead); + } + firstWindowId = t.getWindowId(); + } break activequeue; } break; @@ -394,6 +459,15 @@ public class GenericNode extends Node<Operator> case END_STREAM: activePort.remove(); buffers.remove(); + if (firstWindowId == -1) { + // this is for recovery from a checkpoint for DelayOperator + if (delay) { + // if it's a DelayOperator and this is the first RESET_WINDOW (start) or END_STREAM (recovery), + // fabricate the first window + fabricateFirstWindow((Operator.DelayOperator)operator, windowAhead); + } + firstWindowId = t.getWindowId(); + } for (Iterator<Entry<String, SweepableReservoir>> it = inputs.entrySet().iterator(); it.hasNext(); ) { Entry<String, SweepableReservoir> e = it.next(); if (e.getValue() == activePort) { @@ -409,7 +483,7 @@ public class GenericNode extends Node<Operator> if (e.getKey().equals(dic.portname)) { connectInputPort(dic.portname, dic.reservoir); dici.remove(); - activeQueues.add(dic.reservoir); + activeQueues.add(new AbstractMap.SimpleEntry<>(dic.portname, dic.reservoir)); break activequeue; } } @@ -427,17 +501,18 @@ public class GenericNode extends Node<Operator> * Since one of the operators we care about it gone, we should relook at our ports. * We need to make sure that the END_STREAM comes outside of the window. */ + regularQueues--; totalQueues--; boolean break_activequeue = false; - if (totalQueues == 0) { + if (regularQueues == 0) { alive = false; break_activequeue = true; } else if (activeQueues.isEmpty()) { assert (!inputs.isEmpty()); processEndWindow(null); - activeQueues.addAll(inputs.values()); + activeQueues.addAll(inputs.entrySet()); expectingBeginWindow = activeQueues.size(); break_activequeue = true; } @@ -450,22 +525,22 @@ public class GenericNode extends Node<Operator> * it's the only one which has not, then we consider it delivered and release the reset tuple downstream. */ Tuple tuple = null; - for (trackerIterator = resetTupleTracker.iterator(); trackerIterator.hasNext(); ) { + for (Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator(); trackerIterator.hasNext(); ) { tracker = trackerIterator.next(); trackerIndex = 0; while (trackerIndex < tracker.ports.length) { if (tracker.ports[trackerIndex] == activePort) { - SweepableReservoir[] ports = new SweepableReservoir[totalQueues]; + SweepableReservoir[] ports = new SweepableReservoir[regularQueues]; System.arraycopy(tracker.ports, 0, ports, 0, trackerIndex); - if (trackerIndex < totalQueues) { + if (trackerIndex < regularQueues) { System.arraycopy(tracker.ports, trackerIndex + 1, ports, trackerIndex, tracker.ports.length - trackerIndex - 1); } tracker.ports = ports; break; } else if (tracker.ports[trackerIndex] == null) { - if (trackerIndex == totalQueues) { /* totalQueues is already adjusted above */ + if (trackerIndex == regularQueues) { /* regularQueues is already adjusted above */ if (tuple == null || tuple.getBaseSeconds() < tracker.tuple.getBaseSeconds()) { tuple = tracker.tuple; } @@ -475,7 +550,7 @@ public class GenericNode extends Node<Operator> break; } else { - tracker.ports = Arrays.copyOf(tracker.ports, totalQueues); + tracker.ports = Arrays.copyOf(tracker.ports, regularQueues); } trackerIndex++; @@ -485,7 +560,7 @@ public class GenericNode extends Node<Operator> /* * Since we were waiting for a reset tuple on this stream, we should not any longer. */ - if (tuple != null) { + if (tuple != null && !delay) { for (int s = sinks.length; s-- > 0; ) { sinks[s].put(tuple); } @@ -509,8 +584,8 @@ public class GenericNode extends Node<Operator> } else { boolean need2sleep = true; - for (SweepableReservoir cb : activeQueues) { - if (cb.size() > 0) { + for (Map.Entry<String, SweepableReservoir> cb : activeQueues) { + if (cb.getValue().size() > 0) { need2sleep = false; break; } @@ -582,6 +657,21 @@ public class GenericNode extends Node<Operator> } + private void fabricateFirstWindow(Operator.DelayOperator delayOperator, long windowAhead) + { + Tuple beginWindowTuple = new Tuple(MessageType.BEGIN_WINDOW, windowAhead); + Tuple endWindowTuple = new Tuple(MessageType.END_WINDOW, windowAhead); + for (Sink<Object> sink : outputs.values()) { + sink.put(beginWindowTuple); + } + controlTupleCount++; + delayOperator.firstWindow(); + for (Sink<Object> sink : outputs.values()) { + sink.put(endWindowTuple); + } + controlTupleCount++; + } + /** * End window dequeue times may not have been saved for all the input ports during deactivate, * so save them for reporting. SPOI-1324. http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/engine/Node.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java index 068a325..d4970cd 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java @@ -126,6 +126,8 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera private ExecutorService executorService; private Queue<Pair<FutureTask<Stats.CheckpointStats>, CheckpointWindowInfo>> taskQueue; protected Stats.CheckpointStats checkpointStats; + public long firstWindowMillis; + public long windowWidthMillis; public Node(OPERATOR operator, OperatorContext context) { @@ -354,7 +356,9 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera protected void emitEndWindow() { - EndWindowTuple ewt = new EndWindowTuple(currentWindowId); + long windowId = (operator instanceof Operator.DelayOperator) ? + WindowGenerator.getAheadWindowId(currentWindowId, firstWindowMillis, windowWidthMillis, 1) : currentWindowId; + EndWindowTuple ewt = new EndWindowTuple(windowId); for (int s = sinks.length; s-- > 0; ) { sinks[s].put(ewt); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index 14e00a9..79d9037 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -894,6 +894,8 @@ public class StreamingContainer extends YarnContainerMain Node<?> node = Node.retrieveNode(backupAgent.load(ndi.id, ctx.stateless ? Stateless.WINDOW_ID : ndi.checkpoint.windowId), ctx, ndi.type); node.currentWindowId = ndi.checkpoint.windowId; node.applicationWindowCount = ndi.checkpoint.applicationWindowCount; + node.firstWindowMillis = firstWindowMillis; + node.windowWidthMillis = windowWidthMillis; node.setId(ndi.id); nodes.put(ndi.id, node); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java index 5610112..ea429af 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java @@ -314,13 +314,25 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable long baseMillis = (windowId >> 32) * 1000; long diff = baseMillis - firstWindowMillis; long baseChangeInterval = windowWidthMillis * (WindowGenerator.MAX_WINDOW_ID + 1); + assert (baseChangeInterval > 0); long multiplier = diff / baseChangeInterval; if (diff % baseChangeInterval > 0) { multiplier++; } assert (multiplier >= 0); windowId = windowId & WindowGenerator.WINDOW_MASK; - return firstWindowMillis + (multiplier * windowWidthMillis * (WindowGenerator.MAX_WINDOW_ID + 1)) + windowId * windowWidthMillis; + return firstWindowMillis + (multiplier * baseChangeInterval) + (windowId * windowWidthMillis); + } + + /** + * Utility function to get the base seconds from a window id + * + * @param windowId + * @return the base seconds for the given window id + */ + public static long getBaseSecondsFromWindowId(long windowId) + { + return windowId >>> 32; } private class MasterReservoir extends CircularBuffer<Tuple> implements Reservoir http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 867f814..3c26118 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -1758,6 +1758,14 @@ public class LogicalPlan implements Serializable, DAG throw new ValidationException("Loops in graph: " + cycles); } + List<List<String>> invalidDelays = new ArrayList<>(); + for (OperatorMeta n : rootOperators) { + findInvalidDelays(n, invalidDelays); + } + if (!invalidDelays.isEmpty()) { + throw new ValidationException("Invalid delays in graph: " + invalidDelays); + } + for (StreamMeta s: streams.values()) { if (s.source == null) { throw new ValidationException("Stream source not connected: " + s.getName()); @@ -1814,6 +1822,11 @@ public class LogicalPlan implements Serializable, DAG return; } + if (om.getOperator() instanceof Operator.DelayOperator) { + String msg = String.format("Locality %s invalid for delay operator %s", Locality.THREAD_LOCAL, om); + throw new ValidationException(msg); + } + for (StreamMeta sm: om.inputStreams.values()){ // validation fail as each input stream should be OIO if (sm.locality != Locality.THREAD_LOCAL){ @@ -1822,6 +1835,10 @@ public class LogicalPlan implements Serializable, DAG throw new ValidationException(msg); } + if (sm.source.operatorMeta.getOperator() instanceof Operator.DelayOperator) { + String msg = String.format("Locality %s invalid for delay operator %s", Locality.THREAD_LOCAL, sm.source.operatorMeta); + throw new ValidationException(msg); + } // gets oio root for input operator for the stream Integer oioStreamRoot = getOioRoot(sm.source.operatorMeta); @@ -1895,6 +1912,11 @@ public class LogicalPlan implements Serializable, DAG // depth first successors traversal for (StreamMeta downStream: om.outputStreams.values()) { for (InputPortMeta sink: downStream.sinks) { + if (om.getOperator() instanceof Operator.DelayOperator) { + // this is an iteration loop, do not treat it as downstream when detecting cycles + sink.attributes.put(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR, true); + continue; + } OperatorMeta successor = sink.getOperatorWrapper(); if (successor == null) { continue; @@ -1932,6 +1954,37 @@ public class LogicalPlan implements Serializable, DAG } } + public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays) + { + stack.push(om); + + // depth first successors traversal + boolean isDelayOperator = om.getOperator() instanceof Operator.DelayOperator; + if (isDelayOperator) { + if (om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) != 1) { + LOG.debug("detected DelayOperator having APPLICATION_WINDOW_COUNT not equal to 1"); + invalidDelays.add(Collections.singletonList(om.getName())); + } + } + + for (StreamMeta downStream: om.outputStreams.values()) { + for (InputPortMeta sink : downStream.sinks) { + OperatorMeta successor = sink.getOperatorWrapper(); + if (isDelayOperator) { + // Check whether all downstream operators are already visited in the path + if (successor != null && !stack.contains(successor)) { + LOG.debug("detected DelayOperator does not immediately output to a visited operator {}.{}->{}.{}", + om.getName(), downStream.getSource().getPortName(), successor.getName(), sink.getPortName()); + invalidDelays.add(Arrays.asList(om.getName(), successor.getName())); + } + } else { + findInvalidDelays(successor, invalidDelays); + } + } + } + stack.pop(); + } + private void validateProcessingMode(OperatorMeta om, Set<OperatorMeta> visited) { for (StreamMeta is : om.getInputStreams().values()) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java index 6adfd64..ae276d8 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java @@ -81,6 +81,7 @@ public class PTOperator implements java.io.Serializable public final PartitionKeys partitions; public final PTOutput source; public final String portName; + public final boolean delay; /** * @@ -90,7 +91,7 @@ public class PTOperator implements java.io.Serializable * @param partitions * @param source */ - protected PTInput(String portName, StreamMeta logicalStream, PTOperator target, PartitionKeys partitions, PTOutput source) + protected PTInput(String portName, StreamMeta logicalStream, PTOperator target, PartitionKeys partitions, PTOutput source, boolean delay) { this.logicalStream = logicalStream; this.target = target; @@ -98,6 +99,7 @@ public class PTOperator implements java.io.Serializable this.source = source; this.portName = portName; this.source.sinks.add(this); + this.delay = delay; } /** http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index 829a6fd..da96ef3 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -328,8 +328,11 @@ public class PhysicalPlan implements Serializable boolean upstreamDeployed = true; - for (StreamMeta s : n.getInputStreams().values()) { - if (s.getSource() != null && !this.logicalToPTOperator.containsKey(s.getSource().getOperatorMeta())) { + for (Map.Entry<InputPortMeta, StreamMeta> entry : n.getInputStreams().entrySet()) { + StreamMeta s = entry.getValue(); + boolean delay = entry.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR); + // skip delay sources since it's going to be handled as downstream + if (!delay && s.getSource() != null && !this.logicalToPTOperator.containsKey(s.getSource().getOperatorMeta())) { pendingNodes.push(n); pendingNodes.push(s.getSource().getOperatorMeta()); upstreamDeployed = false; @@ -907,7 +910,10 @@ public class PhysicalPlan implements Serializable for (Map.Entry<InputPortMeta, StreamMeta> ipm : m.logicalOperator.getInputStreams().entrySet()) { PMapping sourceMapping = this.logicalToPTOperator.get(ipm.getValue().getSource().getOperatorMeta()); - + if (ipm.getValue().getSource().getOperatorMeta().getOperator() instanceof Operator.DelayOperator) { + // skip if the source is a DelayOperator + continue; + } if (ipm.getKey().getValue(PortContext.PARTITION_PARALLEL)) { if (sourceMapping.partitions.size() < m.partitions.size()) { throw new AssertionError("Number of partitions don't match in parallel mapping " + sourceMapping.logicalOperator.getName() + " -> " + m.logicalOperator.getName() + ", " + sourceMapping.partitions.size() + " -> " + m.partitions.size()); @@ -942,11 +948,11 @@ public class PhysicalPlan implements Serializable PTOperator slidingUnifier = StreamMapping.createSlidingUnifier(sourceOut.logicalStream, this, sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT), slidingWindowCount); StreamMapping.addInput(slidingUnifier, sourceOut, null); - input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0)); + input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR)); sourceMapping.outputStreams.get(ipm.getValue().getSource()).slidingUnifiers.add(slidingUnifier); } else { - input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut); + input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR)); } oper.inputs.add(input); } @@ -1445,6 +1451,9 @@ public class PhysicalPlan implements Serializable PMapping upstreamPartitioned = null; for (Map.Entry<LogicalPlan.InputPortMeta, StreamMeta> e : om.getInputStreams().entrySet()) { + if (e.getValue().getSource().getOperatorMeta().getOperator() instanceof Operator.DelayOperator) { + continue; + } PMapping m = logicalToPTOperator.get(e.getValue().getSource().getOperatorMeta()); if (e.getKey().getValue(PortContext.PARTITION_PARALLEL).equals(true)) { // operator partitioned with upstream http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java index d42c327..91c6eef 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java @@ -347,7 +347,7 @@ public class StreamMapping implements java.io.Serializable // link to upstream output(s) for this stream for (PTOutput upstreamOut : sourceOper.outputs) { if (upstreamOut.logicalStream == streamMeta) { - PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut); + PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut, ipm.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR)); oper.inputs.add(input); } } @@ -356,7 +356,7 @@ public class StreamMapping implements java.io.Serializable public static void addInput(PTOperator target, PTOutput upstreamOut, PartitionKeys pks) { StreamMeta lStreamMeta = upstreamOut.logicalStream; - PTInput input = new PTInput("<merge#" + lStreamMeta.getSource().getPortName() + ">", lStreamMeta, target, pks, upstreamOut); + PTInput input = new PTInput("<merge#" + lStreamMeta.getSource().getPortName() + ">", lStreamMeta, target, pks, upstreamOut, false); target.inputs.add(input); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java b/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java index 23c197b..9191b65 100644 --- a/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java +++ b/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java @@ -52,6 +52,11 @@ public class Tuple return windowId; } + public void setWindowId(long windowId) + { + this.windowId = windowId; + } + public final int getBaseSeconds() { return (int)(windowId >> 32); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java index 3f97b54..1c17d68 100644 --- a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java +++ b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java @@ -76,8 +76,7 @@ public class TupleRecorderTest public TupleRecorder getTupleRecorder(final StramLocalCluster localCluster, final PTOperator op) { TupleRecorderCollection instance = (TupleRecorderCollection)localCluster.getContainer(op).getInstance(classname); - TupleRecorder tupleRecorder = instance.getTupleRecorder(op.getId(), null); - return tupleRecorder; + return instance.getTupleRecorder(op.getId(), null); } public class Tuple @@ -89,8 +88,7 @@ public class TupleRecorderTest @Test public void testRecorder() throws IOException { - FileSystem fs = new LocalFileSystem(); - try { + try (FileSystem fs = new LocalFileSystem()) { TupleRecorder recorder = new TupleRecorder(null, "application_test_id_1"); recorder.getStorage().setBytesPerPartFile(4096); recorder.getStorage().setLocalMode(true); @@ -132,80 +130,76 @@ public class TupleRecorderTest fs.initialize((new Path(recorder.getStorage().getBasePath()).toUri()), new Configuration()); Path path; - FSDataInputStream is; String line; - BufferedReader br; path = new Path(recorder.getStorage().getBasePath(), FSPartFileCollection.INDEX_FILE); - is = fs.open(path); - br = new BufferedReader(new InputStreamReader(is)); - - line = br.readLine(); - // Assert.assertEquals("check index", "B:1000:T:0:part0.txt", line); - Assert.assertTrue("check index", line.matches("F:part0.txt:\\d+-\\d+:4:T:1000-1000:33:\\{\"3\":\"1\",\"1\":\"1\",\"0\":\"1\",\"2\":\"1\"\\}")); + try (FSDataInputStream is = fs.open(path); + BufferedReader br = new BufferedReader(new InputStreamReader(is))) { + line = br.readLine(); + // Assert.assertEquals("check index", "B:1000:T:0:part0.txt", line); + Assert.assertTrue("check index", line + .matches("F:part0.txt:\\d+-\\d+:4:T:1000-1000:33:\\{\"3\":\"1\",\"1\":\"1\",\"0\":\"1\",\"2\":\"1\"\\}")); + } path = new Path(recorder.getStorage().getBasePath(), FSPartFileCollection.META_FILE); - is = fs.open(path); - br = new BufferedReader(new InputStreamReader(is)); - - ObjectMapper mapper = new ObjectMapper(); - line = br.readLine(); - Assert.assertEquals("check version", "1.2", line); - br.readLine(); // RecordInfo - //RecordInfo ri = mapper.readValue(line, RecordInfo.class); - line = br.readLine(); - PortInfo pi = mapper.readValue(line, PortInfo.class); - Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).id, pi.id); - Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).type, pi.type); - line = br.readLine(); - pi = mapper.readValue(line, PortInfo.class); - Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).id, pi.id); - Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).type, pi.type); - line = br.readLine(); - pi = mapper.readValue(line, PortInfo.class); - Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).id, pi.id); - Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).type, pi.type); - line = br.readLine(); - pi = mapper.readValue(line, PortInfo.class); - Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).id, pi.id); - Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).type, pi.type); - Assert.assertEquals("port size", 4, recorder.getPortInfoMap().size()); - //line = br.readLine(); - + try (FSDataInputStream is = fs.open(path); + BufferedReader br = new BufferedReader(new InputStreamReader(is))) { + + ObjectMapper mapper = new ObjectMapper(); + line = br.readLine(); + Assert.assertEquals("check version", "1.2", line); + br.readLine(); // RecordInfo + //RecordInfo ri = mapper.readValue(line, RecordInfo.class); + line = br.readLine(); + PortInfo pi = mapper.readValue(line, PortInfo.class); + Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).id, pi.id); + Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).type, pi.type); + line = br.readLine(); + pi = mapper.readValue(line, PortInfo.class); + Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).id, pi.id); + Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).type, pi.type); + line = br.readLine(); + pi = mapper.readValue(line, PortInfo.class); + Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).id, pi.id); + Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).type, pi.type); + line = br.readLine(); + pi = mapper.readValue(line, PortInfo.class); + Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).id, pi.id); + Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).type, pi.type); + Assert.assertEquals("port size", 4, recorder.getPortInfoMap().size()); + //line = br.readLine(); + } path = new Path(recorder.getStorage().getBasePath(), "part0.txt"); - is = fs.open(path); - br = new BufferedReader(new InputStreamReader(is)); + try (FSDataInputStream is = fs.open(path); + BufferedReader br = new BufferedReader(new InputStreamReader(is))) { - line = br.readLine(); - Assert.assertTrue("check part0", line.startsWith("B:")); - Assert.assertTrue("check part0", line.endsWith(":1000")); + line = br.readLine(); + Assert.assertTrue("check part0", line.startsWith("B:")); + Assert.assertTrue("check part0", line.endsWith(":1000")); - line = br.readLine(); - Assert.assertTrue("check part0 1", line.startsWith("T:")); - Assert.assertTrue("check part0 1", line.endsWith(":0:30:{\"key\":\"speed\",\"value\":\"5m/h\"}")); + line = br.readLine(); + Assert.assertTrue("check part0 1", line.startsWith("T:")); + Assert.assertTrue("check part0 1", line.endsWith(":0:30:{\"key\":\"speed\",\"value\":\"5m/h\"}")); - line = br.readLine(); - Assert.assertTrue("check part0 2", line.startsWith("T:")); - Assert.assertTrue("check part0 2", line.endsWith(":2:30:{\"key\":\"speed\",\"value\":\"4m/h\"}")); + line = br.readLine(); + Assert.assertTrue("check part0 2", line.startsWith("T:")); + Assert.assertTrue("check part0 2", line.endsWith(":2:30:{\"key\":\"speed\",\"value\":\"4m/h\"}")); - line = br.readLine(); - Assert.assertTrue("check part0 3", line.startsWith("T:")); - Assert.assertTrue("check part0 3", line.endsWith(":1:30:{\"key\":\"speed\",\"value\":\"6m/h\"}")); + line = br.readLine(); + Assert.assertTrue("check part0 3", line.startsWith("T:")); + Assert.assertTrue("check part0 3", line.endsWith(":1:30:{\"key\":\"speed\",\"value\":\"6m/h\"}")); - line = br.readLine(); - Assert.assertTrue("check part0 4", line.startsWith("T:")); - Assert.assertTrue("check part0 4", line.endsWith(":3:30:{\"key\":\"speed\",\"value\":\"2m/h\"}")); + line = br.readLine(); + Assert.assertTrue("check part0 4", line.startsWith("T:")); + Assert.assertTrue("check part0 4", line.endsWith(":3:30:{\"key\":\"speed\",\"value\":\"2m/h\"}")); - line = br.readLine(); - Assert.assertTrue("check part0 5", line.startsWith("E:")); - Assert.assertTrue("check part0 5", line.endsWith(":1000")); - } - catch (IOException ex) { + line = br.readLine(); + Assert.assertTrue("check part0 5", line.startsWith("E:")); + Assert.assertTrue("check part0 5", line.endsWith(":1000")); + } + } catch (IOException ex) { throw new RuntimeException(ex); } - finally { - fs.close(); - } } private static final File testWorkDir = new File("target", TupleRecorderTest.class.getName()); @@ -234,17 +228,17 @@ public class TupleRecorderTest final PTOperator ptOp2 = localCluster.findByLogicalNode(dag.getMeta(op2)); StramTestSupport.waitForActivation(localCluster, ptOp2); - testRecordingOnOperator(localCluster, ptOp2, 2); + testRecordingOnOperator(localCluster, ptOp2); final PTOperator ptOp1 = localCluster.findByLogicalNode(dag.getMeta(op1)); StramTestSupport.waitForActivation(localCluster, ptOp1); - testRecordingOnOperator(localCluster, ptOp1, 1); + testRecordingOnOperator(localCluster, ptOp1); localCluster.shutdown(); } - private void testRecordingOnOperator(final StramLocalCluster localCluster, final PTOperator op, int numPorts) throws Exception + private void testRecordingOnOperator(final StramLocalCluster localCluster, final PTOperator op) throws Exception { String id = "xyz"; localCluster.getStreamingContainerManager().startRecording(id, op.getId(), null, 0); @@ -259,25 +253,30 @@ public class TupleRecorderTest }; Assert.assertTrue("Should get a tuple recorder within 10 seconds", StramTestSupport.awaitCompletion(c, 10000)); - TupleRecorder tupleRecorder = getTupleRecorder(localCluster, op); + final TupleRecorder tupleRecorder = getTupleRecorder(localCluster, op); long startTime = tupleRecorder.getStartTime(); - BufferedReader br; String line; File dir = new File(testWorkDir, "recordings/" + op.getId() + "/" + id); File file; - file = new File(dir, "meta.txt"); + file = new File(dir, FSPartFileCollection.META_FILE); Assert.assertTrue("meta file should exist", file.exists()); - br = new BufferedReader(new FileReader(file)); - line = br.readLine(); - Assert.assertEquals("version should be 1.2", "1.2", line); - line = br.readLine(); - JSONObject json = new JSONObject(line); - Assert.assertEquals("Start time verification", startTime, json.getLong("startTime")); - - for (int i = 0; i < numPorts; i++) { + int numPorts = tupleRecorder.getSinkMap().size(); + + try (BufferedReader br = new BufferedReader(new FileReader(file))) { line = br.readLine(); - Assert.assertTrue("should contain name, streamName, type and id", line != null && line.contains("\"name\"") && line.contains("\"streamName\"") && line.contains("\"type\"") && line.contains("\"id\"")); + Assert.assertEquals("version should be 1.2", "1.2", line); + line = br.readLine(); + JSONObject json = new JSONObject(line); + Assert.assertEquals("Start time verification", startTime, json.getLong("startTime")); + Assert.assertTrue(numPorts > 0); + + for (int i = 0; i < numPorts; i++) { + line = br.readLine(); + Assert.assertTrue("should contain name, streamName, type and id", line != null && line + .contains("\"name\"") && line.contains("\"streamName\"") && line.contains("\"type\"") && line + .contains("\"id\"")); + } } c = new WaitCondition() @@ -285,7 +284,6 @@ public class TupleRecorderTest @Override public boolean isComplete() { - TupleRecorder tupleRecorder = getTupleRecorder(localCluster, op); return (tupleRecorder.getTotalTupleCount() >= testTupleCount); } @@ -306,24 +304,23 @@ public class TupleRecorderTest }; Assert.assertTrue("Tuple recorder shouldn't exist any more after stopping", StramTestSupport.awaitCompletion(c, 5000)); - file = new File(dir, "index.txt"); + file = new File(dir, FSPartFileCollection.INDEX_FILE); Assert.assertTrue("index file should exist", file.exists()); - br = new BufferedReader(new FileReader(file)); - ArrayList<String> partFiles = new ArrayList<String>(); + ArrayList<String> partFiles = new ArrayList<>(); int indexCount = 0; - while ((line = br.readLine()) != null) { - String partFile = "part" + indexCount + ".txt"; - if (line.startsWith("F:" + partFile + ":")) { - partFiles.add(partFile); - indexCount++; - } - else if (line.startsWith("E")) { - Assert.assertEquals("index file should end after E line", br.readLine(), null); - break; - } - else { - Assert.fail("index file line is not starting with F or E"); + try (BufferedReader br = new BufferedReader(new FileReader(file))) { + while ((line = br.readLine()) != null) { + String partFile = "part" + indexCount + ".txt"; + if (line.startsWith("F:" + partFile + ":")) { + partFiles.add(partFile); + indexCount++; + } else if (line.startsWith("E")) { + Assert.assertEquals("index file should end after E line", br.readLine(), null); + break; + } else { + Assert.fail("index file line is not starting with F or E"); + } } } @@ -337,17 +334,16 @@ public class TupleRecorderTest Assert.assertTrue(partFile + " should be greater than 1KB", file.length() >= 1024); } Assert.assertTrue(partFile + " should exist", file.exists()); - br = new BufferedReader(new FileReader(file)); - while ((line = br.readLine()) != null) { - if (line.startsWith("B:")) { - beginWindowExists = true; - } - else if (line.startsWith("E:")) { - endWindowExists = true; - } - else if (line.startsWith("T:")) { - String[] parts = line.split(":"); - tupleCount[Integer.valueOf(parts[2])]++; + try (BufferedReader br = new BufferedReader(new FileReader(file))) { + while ((line = br.readLine()) != null) { + if (line.startsWith("B:")) { + beginWindowExists = true; + } else if (line.startsWith("E:")) { + endWindowExists = true; + } else if (line.startsWith("T:")) { + String[] parts = line.split(":"); + tupleCount[Integer.valueOf(parts[2])]++; + } } } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java index c7e8ccc..2577504 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java @@ -277,6 +277,8 @@ public class GenericNodeTest gn.connectInputPort("ip1", reservoir1); gn.connectInputPort("ip2", reservoir2); gn.connectOutputPort("op", output); + gn.firstWindowMillis = 0; + gn.windowWidthMillis = 100; final AtomicBoolean ab = new AtomicBoolean(false); Thread t = new Thread() @@ -382,6 +384,8 @@ public class GenericNodeTest gn.connectInputPort("ip1", reservoir1); gn.connectInputPort("ip2", reservoir2); gn.connectOutputPort("op", Sink.BLACKHOLE); + gn.firstWindowMillis = 0; + gn.windowWidthMillis = 100; final AtomicBoolean ab = new AtomicBoolean(false); Thread t = new Thread() @@ -493,6 +497,8 @@ public class GenericNodeTest in.connectInputPort("ip1", windowGenerator.acquireReservoir(String.valueOf(in.id), 1024)); in.connectOutputPort("output", testSink); + in.firstWindowMillis = 0; + in.windowWidthMillis = 100; windowGenerator.activate(null); @@ -551,9 +557,13 @@ public class GenericNodeTest final long sleepTime = 25L; WindowGenerator windowGenerator = new WindowGenerator(new ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024); - windowGenerator.setResetWindow(0L); - windowGenerator.setFirstWindow(1448909287863L); - windowGenerator.setWindowWidth(100); + long resetWindow = 0L; + long firstWindowMillis = 1448909287863L; + int windowWidth = 100; + + windowGenerator.setResetWindow(resetWindow); + windowGenerator.setFirstWindow(firstWindowMillis); + windowGenerator.setWindowWidth(windowWidth); windowGenerator.setCheckpointCount(1, 0); GenericOperator go = new GenericOperator(); @@ -576,6 +586,8 @@ public class GenericNodeTest gn.connectInputPort("ip1", windowGenerator.acquireReservoir(String.valueOf(gn.id), 1024)); gn.connectOutputPort("output", testSink); + gn.firstWindowMillis = firstWindowMillis; + gn.windowWidthMillis = windowWidth; windowGenerator.activate(null); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java index 0c8ae62..a3b0c53 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java @@ -132,6 +132,9 @@ public class GenericTestOperator extends BaseOperator { if (outport1.isConnected()) { outport1.emit(o); } + if (outport2.isConnected()) { + outport2.emit(o); + } } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java new file mode 100644 index 0000000..359da17 --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.plan.logical; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.validation.ValidationException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.common.util.DefaultDelayOperator; +import com.datatorrent.stram.StramLocalCluster; +import com.datatorrent.stram.engine.GenericTestOperator; +import com.datatorrent.stram.engine.TestGeneratorInputOperator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Unit tests for topologies with delay operator + */ +public class DelayOperatorTest +{ + private static Lock sequential = new ReentrantLock(); + + @Before + public void setup() + { + sequential.lock(); + } + + @After + public void teardown() + { + sequential.unlock(); + } + + @Test + public void testInvalidDelayDetection() + { + LogicalPlan dag = new LogicalPlan(); + + GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class); + GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class); + GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class); + DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class); + + dag.addStream("BtoC", opB.outport1, opC.inport1); + dag.addStream("CtoD", opC.outport1, opD.inport1); + dag.addStream("CtoDelay", opC.outport2, opDelay.input); + dag.addStream("DelayToD", opDelay.output, opD.inport2); + + List<List<String>> invalidDelays = new ArrayList<>(); + dag.findInvalidDelays(dag.getMeta(opB), invalidDelays); + assertEquals("operator invalid delay", 1, invalidDelays.size()); + + try { + dag.validate(); + fail("validation should fail"); + } catch (ValidationException e) { + // expected + } + + dag = new LogicalPlan(); + + opB = dag.addOperator("B", GenericTestOperator.class); + opC = dag.addOperator("C", GenericTestOperator.class); + opD = dag.addOperator("D", GenericTestOperator.class); + opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class); + dag.setAttribute(opDelay, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 2); + dag.addStream("BtoC", opB.outport1, opC.inport1); + dag.addStream("CtoD", opC.outport1, opD.inport1); + dag.addStream("CtoDelay", opC.outport2, opDelay.input); + dag.addStream("DelayToC", opDelay.output, opC.inport2); + + invalidDelays = new ArrayList<>(); + dag.findInvalidDelays(dag.getMeta(opB), invalidDelays); + assertEquals("operator invalid delay", 1, invalidDelays.size()); + + try { + dag.validate(); + fail("validation should fail"); + } catch (ValidationException e) { + // expected + } + + dag = new LogicalPlan(); + + opB = dag.addOperator("B", GenericTestOperator.class); + opC = dag.addOperator("C", GenericTestOperator.class); + opD = dag.addOperator("D", GenericTestOperator.class); + opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class); + dag.addStream("BtoC", opB.outport1, opC.inport1); + dag.addStream("CtoD", opC.outport1, opD.inport1); + dag.addStream("CtoDelay", opC.outport2, opDelay.input).setLocality(DAG.Locality.THREAD_LOCAL); + dag.addStream("DelayToC", opDelay.output, opC.inport2).setLocality(DAG.Locality.THREAD_LOCAL); + + try { + dag.validate(); + fail("validation should fail"); + } catch (ValidationException e) { + // expected + } + } + + @Test + public void testValidDelay() + { + LogicalPlan dag = new LogicalPlan(); + + TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class); + GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class); + GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class); + GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class); + DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class); + + dag.addStream("AtoB", opA.outport, opB.inport1); + dag.addStream("BtoC", opB.outport1, opC.inport1); + dag.addStream("CtoD", opC.outport1, opD.inport1); + dag.addStream("CtoDelay", opC.outport2, opDelay.input); + dag.addStream("DelayToB", opDelay.output, opB.inport2); + dag.validate(); + } + + public static final Long[] FIBONACCI_NUMBERS = new Long[]{ + 1L, 1L, 2L, 3L, 5L, 8L, 13L, 21L, 34L, 55L, 89L, 144L, 233L, 377L, 610L, 987L, 1597L, 2584L, 4181L, 6765L, + 10946L, 17711L, 28657L, 46368L, 75025L, 121393L, 196418L, 317811L, 514229L, 832040L, 1346269L, 2178309L, + 3524578L, 5702887L, 9227465L, 14930352L, 24157817L, 39088169L, 63245986L, 102334155L + }; + + public static class FibonacciOperator extends BaseOperator + { + public static List<Long> results = new ArrayList<>(); + public long currentNumber = 1; + private transient long tempNum; + + public transient DefaultInputPort<Object> dummyInputPort = new DefaultInputPort<Object>() + { + @Override + public void process(Object tuple) + { + } + }; + public transient DefaultInputPort<Long> input = new DefaultInputPort<Long>() + { + @Override + public void process(Long tuple) + { + tempNum = tuple; + } + }; + public transient DefaultOutputPort<Long> output = new DefaultOutputPort<>(); + + @Override + public void endWindow() + { + output.emit(currentNumber); + results.add(currentNumber); + currentNumber += tempNum; + if (currentNumber <= 0) { + // overflow + currentNumber = 1; + } + } + + } + + public static class FailableFibonacciOperator extends FibonacciOperator implements Operator.CheckpointListener + { + private boolean committed = false; + private int simulateFailureWindows = 0; + private boolean simulateFailureAfterCommit = false; + private int windowCount = 0; + public static boolean failureSimulated = false; + + @Override + public void beginWindow(long windowId) + { + if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit && committed) || !simulateFailureAfterCommit) && + !failureSimulated) { + if (windowCount++ == simulateFailureWindows) { + failureSimulated = true; + throw new RuntimeException("simulating failure"); + } + } + } + + @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + committed = true; + } + + public void setSimulateFailureWindows(int windows, boolean afterCommit) + { + this.simulateFailureAfterCommit = afterCommit; + this.simulateFailureWindows = windows; + } + } + + public static class FailableDelayOperator extends DefaultDelayOperator implements Operator.CheckpointListener + { + private boolean committed = false; + private int simulateFailureWindows = 0; + private boolean simulateFailureAfterCommit = false; + private int windowCount = 0; + private static boolean failureSimulated = false; + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit && committed) || !simulateFailureAfterCommit) && + !failureSimulated) { + if (windowCount++ == simulateFailureWindows) { + failureSimulated = true; + throw new RuntimeException("simulating failure"); + } + } + } + + @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + committed = true; + } + + public void setSimulateFailureWindows(int windows, boolean afterCommit) + { + this.simulateFailureAfterCommit = afterCommit; + this.simulateFailureWindows = windows; + } + } + + + @Test + public void testFibonacci() throws Exception + { + LogicalPlan dag = new LogicalPlan(); + + TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class); + FibonacciOperator fib = dag.addOperator("FIB", FibonacciOperator.class); + DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class); + + dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort); + dag.addStream("operator_to_delay", fib.output, opDelay.input); + dag.addStream("delay_to_operator", opDelay.output, fib.input); + FibonacciOperator.results.clear(); + final StramLocalCluster localCluster = new StramLocalCluster(dag); + localCluster.setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return FibonacciOperator.results.size() >= 10; + } + }); + localCluster.run(10000); + Assert.assertArrayEquals(Arrays.copyOfRange(FIBONACCI_NUMBERS, 0, 10), + FibonacciOperator.results.subList(0, 10).toArray()); + } + + @Ignore // Out of sequence BEGIN_WINDOW tuple on Travis. Will tackle in the next version + @Test + public void testFibonacciRecovery1() throws Exception + { + LogicalPlan dag = new LogicalPlan(); + + TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class); + FailableFibonacciOperator fib = dag.addOperator("FIB", FailableFibonacciOperator.class); + DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class); + + fib.setSimulateFailureWindows(3, true); + + dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort); + dag.addStream("operator_to_delay", fib.output, opDelay.input); + dag.addStream("delay_to_operator", opDelay.output, fib.input); + dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); + dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); + FailableFibonacciOperator.results.clear(); + FailableFibonacciOperator.failureSimulated = false; + final StramLocalCluster localCluster = new StramLocalCluster(dag); + localCluster.setPerContainerBufferServer(true); + localCluster.setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return FailableFibonacciOperator.results.size() >= 30; + } + }); + localCluster.run(60000); + Assert.assertTrue("failure should be invoked", FailableFibonacciOperator.failureSimulated); + Assert.assertArrayEquals(Arrays.copyOfRange(new TreeSet<>(Arrays.asList(FIBONACCI_NUMBERS)).toArray(), 0, 20), + Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20)); + } + + @Ignore // Out of sequence BEGIN_WINDOW tuple on Travis. Will tackle in the next version + @Test + public void testFibonacciRecovery2() throws Exception + { + LogicalPlan dag = new LogicalPlan(); + + TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class); + FibonacciOperator fib = dag.addOperator("FIB", FibonacciOperator.class); + FailableDelayOperator opDelay = dag.addOperator("opDelay", FailableDelayOperator.class); + + opDelay.setSimulateFailureWindows(5, true); + + dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort); + dag.addStream("operator_to_delay", fib.output, opDelay.input); + dag.addStream("delay_to_operator", opDelay.output, fib.input); + dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); + dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); + FibonacciOperator.results.clear(); + FailableDelayOperator.failureSimulated = false; + final StramLocalCluster localCluster = new StramLocalCluster(dag); + localCluster.setPerContainerBufferServer(true); + localCluster.setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + return FibonacciOperator.results.size() >= 30; + } + }); + localCluster.run(60000); + + Assert.assertTrue("failure should be invoked", FailableDelayOperator.failureSimulated); + Assert.assertArrayEquals(Arrays.copyOfRange(new TreeSet<>(Arrays.asList(FIBONACCI_NUMBERS)).toArray(), 0, 20), + Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20)); + } + + +}
