Repository: incubator-apex-core Updated Branches: refs/heads/devel-3 b1af72403 -> f35522b5a
Skipping endWindow and operator is shutdown prematurely. APEX-58 #resolve Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/18d43731 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/18d43731 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/18d43731 Branch: refs/heads/devel-3 Commit: 18d437315689aeea32c391a9670f7fc17554fed2 Parents: 8d9aa4b Author: Pramod Immaneni <[email protected]> Authored: Tue Oct 27 18:22:32 2015 -0700 Committer: Pramod Immaneni <[email protected]> Committed: Mon Nov 2 10:41:47 2015 -0800 ---------------------------------------------------------------------- .../datatorrent/stram/engine/GenericNode.java | 6 +- .../java/com/datatorrent/stram/engine/Node.java | 33 ++++++-- .../stram/engine/GenericNodeTest.java | 85 ++++++++++++++++++++ 3 files changed, 118 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/18d43731/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java index 3902f37..26ba98a 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java @@ -553,7 +553,11 @@ public class GenericNode extends Node<Operator> } } - if (insideWindow) { + /** + * TODO: If shutdown and inside window provide alternate way of notifying the operator in such ways + * TODO: as using a listener callback + */ + if (insideWindow && !shutdown) { endWindowEmitTime = System.currentTimeMillis(); operator.endWindow(); if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/18d43731/engine/src/main/java/com/datatorrent/stram/engine/Node.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java index b073dcd..c66df12 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java @@ -28,25 +28,44 @@ import java.lang.reflect.Array; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.*; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; -import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.util.ReflectionUtils; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.math.IntMath; -import com.datatorrent.api.*; +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Component; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.InputPort; import com.datatorrent.api.Operator.OutputPort; import com.datatorrent.api.Operator.ProcessingMode; import com.datatorrent.api.Operator.Unifier; +import com.datatorrent.api.Sink; +import com.datatorrent.api.Stats; +import com.datatorrent.api.StatsListener; import com.datatorrent.api.StatsListener.OperatorRequest; - +import com.datatorrent.api.StorageAgent; import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.Pair; @@ -297,6 +316,10 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera logger.warn("Shutdown requested when context is not available!"); } else { + /* + * Since alive is non-volatile this code explicitly unsets it in the operator lifecycle theread thereby notifying + * it even when the thread is reading it from the cache + */ context.request(new OperatorRequest() { @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/18d43731/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java index 9e62ac5..d5ceae6 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java @@ -211,4 +211,89 @@ public class GenericNodeTest Assert.assertEquals(Thread.State.TERMINATED, t.getState()); } + @Test + public void testPrematureTermination() throws InterruptedException + { + long maxSleep = 5000; + long sleeptime = 25L; + GenericOperator go = new GenericOperator(); + final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, new DefaultAttributeMap(), null)); + gn.setId(1); + DefaultReservoir reservoir1 = new DefaultReservoir("ip1Res", 1024); + DefaultReservoir reservoir2 = new DefaultReservoir("ip2Res", 1024); + + gn.connectInputPort("ip1", reservoir1); + gn.connectInputPort("ip2", reservoir2); + gn.connectOutputPort("op", Sink.BLACKHOLE); + + final AtomicBoolean ab = new AtomicBoolean(false); + Thread t = new Thread() + { + @Override + public void run() + { + ab.set(true); + gn.activate(); + gn.run(); + gn.deactivate(); + } + + }; + t.start(); + + long interval = 0; + do { + Thread.sleep(sleeptime); + interval += sleeptime; + } + while ((ab.get() == false) && (interval < maxSleep)); + + + int controlTupleCount = gn.controlTupleCount; + Tuple beginWindow1 = new Tuple(MessageType.BEGIN_WINDOW, 0x1L); + + reservoir1.add(beginWindow1); + reservoir2.add(beginWindow1); + + interval = 0; + do { + Thread.sleep(sleeptime); + interval += sleeptime; + } + while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep)); + Assert.assertTrue("Begin window called", go.endWindowId != go.beginWindowId); + controlTupleCount = gn.controlTupleCount; + + Tuple endWindow1 = new EndWindowTuple(0x1L); + + reservoir1.add(endWindow1); + reservoir2.add(endWindow1); + + interval = 0; + do { + Thread.sleep(sleeptime); + interval += sleeptime; + } + while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep)); + Assert.assertTrue("End window called", go.endWindowId == go.beginWindowId); + controlTupleCount = gn.controlTupleCount; + + Tuple beginWindow2 = new Tuple(MessageType.BEGIN_WINDOW, 0x2L); + + reservoir1.add(beginWindow2); + reservoir2.add(beginWindow2); + + interval = 0; + do { + Thread.sleep(sleeptime); + interval += sleeptime; + } + while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep)); + + gn.shutdown(); + t.join(); + + Assert.assertTrue("End window not called", go.endWindowId != go.beginWindowId); + } + }
