Repository: apex-core Updated Branches: refs/heads/master 2c024cd84 -> fc3246e11
APEXCORE-505 Heartbeat loop was blocked waiting for operator activation, reason for this is that Stream activation(Only BufferServerSubscriber and WindowGenerator) waits for operator activation in heartbeat thread. After analysis and sanity testing, we don't see the need to have the synchronization between operator and stream activation Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/fc3246e1 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/fc3246e1 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/fc3246e1 Branch: refs/heads/master Commit: fc3246e11afa426eefe1dcaf5b8aface079d7d10 Parents: 2c024cd Author: Sandesh Hegde <[email protected]> Authored: Fri Nov 4 10:53:03 2016 -0700 Committer: Sandesh Hegde <[email protected]> Committed: Wed Nov 9 16:00:38 2016 -0800 ---------------------------------------------------------------------- .../stram/engine/StreamingContainer.java | 35 +---- .../stram/engine/GenericNodeTest.java | 132 +++++++++++++++++++ .../stram/stream/SocketStreamTest.java | 101 ++++++++++---- 3 files changed, 213 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/fc3246e1/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index 8e7e0a1..78f3421 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -38,7 +38,6 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1361,14 +1360,11 @@ public class StreamingContainer extends YarnContainerMain public synchronized void activate(final Map<Integer, OperatorDeployInfo> nodeMap, Map<String, ComponentContextPair<Stream, StreamContext>> newStreams) { for (ComponentContextPair<Stream, StreamContext> pair : newStreams.values()) { - if (!(pair.component instanceof BufferServerSubscriber)) { - activeStreams.put(pair.component, pair.context); - pair.component.activate(pair.context); - eventBus.publish(new StreamActivationEvent(pair)); - } + activeStreams.put(pair.component, pair.context); + pair.component.activate(pair.context); + eventBus.publish(new StreamActivationEvent(pair)); } - final CountDownLatch signal = new CountDownLatch(nodeMap.size()); for (final OperatorDeployInfo ndi : nodeMap.values()) { /* * OiO nodes get activated with their primary nodes. @@ -1408,10 +1404,6 @@ public class StreamingContainer extends YarnContainerMain currentdi = null; - for (int i = setOperators.size(); i-- > 0; ) { - signal.countDown(); - } - node.run(); /* this is a blocking call */ } catch (Error error) { int[] operators; @@ -1448,8 +1440,6 @@ public class StreamingContainer extends YarnContainerMain failedNodes.add(ndi.id); logger.error("Shutdown of operator {} failed due to an exception.", ndi, ex); } - } else { - signal.countDown(); } List<Integer> oioNodeIdList = oioGroups.get(ndi.id); @@ -1463,8 +1453,6 @@ public class StreamingContainer extends YarnContainerMain failedNodes.add(oiodi.id); logger.error("Shutdown of operator {} failed due to an exception.", oiodi, ex); } - } else { - signal.countDown(); } } } @@ -1475,23 +1463,6 @@ public class StreamingContainer extends YarnContainerMain thread.start(); } - /** - * we need to make sure that before any of the operators gets the first message, it's activated. - */ - try { - signal.await(); - } catch (InterruptedException ex) { - logger.debug("Activation of operators interrupted.", ex); - } - - for (ComponentContextPair<Stream, StreamContext> pair : newStreams.values()) { - if (pair.component instanceof BufferServerSubscriber) { - activeStreams.put(pair.component, pair.context); - pair.component.activate(pair.context); - eventBus.publish(new StreamActivationEvent(pair)); - } - } - for (WindowGenerator wg : generators.values()) { if (!activeGenerators.containsKey(wg)) { activeGenerators.put(wg, generators); http://git-wip-us.apache.org/repos/asf/apex-core/blob/fc3246e1/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java index 5dfa5f3..af99e98 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java @@ -20,9 +20,12 @@ package com.datatorrent.stram.engine; import java.io.File; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Assert; @@ -48,13 +51,21 @@ import com.datatorrent.api.Operator.CheckpointNotificationListener; import com.datatorrent.api.Operator.ProcessingMode; import com.datatorrent.api.Sink; import com.datatorrent.api.Stats.OperatorStats; +import com.datatorrent.api.StreamCodec; import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.bufferserver.packet.MessageType; +import com.datatorrent.bufferserver.packet.PayloadTuple; +import com.datatorrent.bufferserver.server.Server; import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.ScheduledExecutorService; import com.datatorrent.common.util.ScheduledThreadPoolExecutor; +import com.datatorrent.netlet.DefaultEventLoop; +import com.datatorrent.netlet.EventLoop; import com.datatorrent.stram.api.Checkpoint; +import com.datatorrent.stram.codec.DefaultStatefulStreamCodec; +import com.datatorrent.stram.stream.BufferServerPublisher; +import com.datatorrent.stram.stream.BufferServerSubscriber; import com.datatorrent.stram.tuple.EndStreamTuple; import com.datatorrent.stram.tuple.EndWindowTuple; import com.datatorrent.stram.tuple.Tuple; @@ -393,6 +404,127 @@ public class GenericNodeTest } @Test + public void testBufferServerSubscriberActivationBeforeOperator() throws InterruptedException, IOException + { + final String streamName = "streamName"; + final String upstreamNodeId = "upstreamNodeId"; + final String downstreamNodeId = "downStreamNodeId"; + + EventLoop eventloop = DefaultEventLoop.createEventLoop("StreamTestEventLoop"); + + ((DefaultEventLoop)eventloop).start(); + final Server bufferServer = new Server(0); // find random port + final int bufferServerPort = bufferServer.run(eventloop).getPort(); + + final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>(); + final BlockingQueue<Object> tuples = new ArrayBlockingQueue<Object>(10); + + GenericTestOperator go = new GenericTestOperator(); + final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator", + new DefaultAttributeMap(), null)); + gn.setId(1); + + Sink<Object> output = new Sink<Object>() + { + @Override + public void put(Object tuple) + { + tuples.add(tuple); + } + + @Override + public int getCount(boolean reset) + { + return 0; + } + }; + + InetSocketAddress socketAddress = new InetSocketAddress("localhost", bufferServerPort); + + StreamContext issContext = new StreamContext(streamName); + issContext.setSourceId(upstreamNodeId); + issContext.setSinkId(downstreamNodeId); + issContext.setFinishedWindowId(-1); + issContext.setBufferServerAddress(socketAddress); + issContext.put(StreamContext.CODEC, serde); + issContext.put(StreamContext.EVENT_LOOP, eventloop); + + StreamContext ossContext = new StreamContext(streamName); + ossContext.setSourceId(upstreamNodeId); + ossContext.setSinkId(downstreamNodeId); + ossContext.setBufferServerAddress(socketAddress); + ossContext.put(StreamContext.CODEC, serde); + ossContext.put(StreamContext.EVENT_LOOP, eventloop); + + BufferServerPublisher oss = new BufferServerPublisher(upstreamNodeId, 1024); + oss.setup(ossContext); + oss.activate(ossContext); + + oss.put(new Tuple(MessageType.BEGIN_WINDOW, 0x1L)); + byte[] buff = PayloadTuple.getSerializedTuple(0, 1); + buff[buff.length - 1] = (byte)1; + oss.put(buff); + oss.put(new EndWindowTuple(0x1L)); + oss.put(new Tuple(MessageType.BEGIN_WINDOW, 0x2L)); + buff = PayloadTuple.getSerializedTuple(0, 1); + buff[buff.length - 1] = (byte)2; + oss.put(buff); + oss.put(new EndWindowTuple(0x2L)); + oss.put(new Tuple(MessageType.BEGIN_WINDOW, 0x3L)); + buff = PayloadTuple.getSerializedTuple(0, 1); + buff[buff.length - 1] = (byte)3; + oss.put(buff); + + oss.put(new EndWindowTuple(0x3L)); + oss.put(new EndStreamTuple(0L)); + + BufferServerSubscriber iss = new BufferServerSubscriber(downstreamNodeId, 1024); + iss.setup(issContext); + + gn.connectInputPort(GenericTestOperator.IPORT1, iss.acquireReservoir("testReservoir", 10)); + gn.connectOutputPort(GenericTestOperator.OPORT1, output); + + SweepableReservoir tupleWait = iss.acquireReservoir("testReservoir2", 10); + + iss.activate(issContext); + + while (tupleWait.sweep() == null) { + Thread.sleep(100); + } + + gn.firstWindowMillis = 0; + gn.windowWidthMillis = 100; + + Thread t = new Thread() + { + @Override + public void run() + { + gn.activate(); + gn.run(); + gn.deactivate(); + } + }; + + t.start(); + t.join(); + + Assert.assertEquals(10, tuples.size()); + + List<Object> list = new ArrayList<>(tuples); + + Assert.assertEquals("Payload Tuple 1", 1, ((byte[])list.get(1))[5]); + Assert.assertEquals("Payload Tuple 2", 2, ((byte[])list.get(4))[5]); + Assert.assertEquals("Payload Tuple 3", 3, ((byte[])list.get(7))[5]); + + if (bufferServer != null) { + eventloop.stop(bufferServer); + } + + ((DefaultEventLoop)eventloop).stop(); + } + + @Test public void testPrematureTermination() throws InterruptedException { long maxSleep = 5000; http://git-wip-us.apache.org/repos/asf/apex-core/blob/fc3246e1/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java index 7db42da..4094f66 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java @@ -22,8 +22,10 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; @@ -53,6 +55,18 @@ public class SocketStreamTest private static final Logger LOG = LoggerFactory.getLogger(SocketStreamTest.class); private static int bufferServerPort = 0; private static Server bufferServer = null; + + private static final String streamName = "streamName"; + private static final String upstreamNodeId = "upstreamNodeId"; + private static final String downstreamNodeId = "downStreamNodeId"; + + private StreamContext issContext; + private StreamContext ossContext; + private SweepableReservoir reservoir; + private BufferServerSubscriber iss; + private BufferServerPublisher oss; + private AtomicInteger messageCount; + static EventLoop eventloop; static { @@ -91,8 +105,53 @@ public class SocketStreamTest @SuppressWarnings({"SleepWhileInLoop"}) public void testBufferServerStream() throws Exception { + iss.activate(issContext); + LOG.debug("input stream activated"); + + oss.activate(ossContext); + LOG.debug("output stream activated"); + + sendMessage(); + } + + /** + * Test buffer server stream by sending + * tuple on outputstream and receive same tuple from inputstream with following changes + * + * 1. Sink is sweeped befere the BufferServerSubscriber is activated. + * 2. BufferServerSubscriber is activated after the messages are sent from BufferServerPublisher + * + * @throws Exception + */ + @Test + @SuppressWarnings({"SleepWhileInLoop"}) + public void testBufferServerStreamWithLateActivationForSubscriber() throws Exception + { + for (int i = 0; i < 50; i++) { + Tuple t = reservoir.sweep(); + if (t == null) { + sleep(5); + continue; + } + + throw new Exception("Unexpected control tuple."); + } + + oss.activate(ossContext); + LOG.debug("output stream activated"); + + sendMessage(); + + iss.activate(issContext); + LOG.debug("input stream activated"); + } + + @Before + public void init() + { final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>(); - final AtomicInteger messageCount = new AtomicInteger(); + messageCount = new AtomicInteger(0); + Sink<Object> sink = new Sink<Object>() { @Override @@ -107,14 +166,9 @@ public class SocketStreamTest { throw new UnsupportedOperationException("Not supported yet."); } - }; - String streamName = "streamName"; - String upstreamNodeId = "upstreamNodeId"; - String downstreamNodeId = "downStreamNodeId"; - - StreamContext issContext = new StreamContext(streamName); + issContext = new StreamContext(streamName); issContext.setSourceId(upstreamNodeId); issContext.setSinkId(downstreamNodeId); issContext.setFinishedWindowId(-1); @@ -122,33 +176,25 @@ public class SocketStreamTest issContext.put(StreamContext.CODEC, serde); issContext.put(StreamContext.EVENT_LOOP, eventloop); - BufferServerSubscriber iss = new BufferServerSubscriber(downstreamNodeId, 1024); + iss = new BufferServerSubscriber(downstreamNodeId, 1024); iss.setup(issContext); - SweepableReservoir reservoir = iss.acquireReservoir("testReservoir", 1); + reservoir = iss.acquireReservoir("testReservoir", 1); reservoir.setSink(sink); - StreamContext ossContext = new StreamContext(streamName); + ossContext = new StreamContext(streamName); ossContext.setSourceId(upstreamNodeId); ossContext.setSinkId(downstreamNodeId); ossContext.setBufferServerAddress(InetSocketAddress.createUnresolved("localhost", bufferServerPort)); ossContext.put(StreamContext.CODEC, serde); ossContext.put(StreamContext.EVENT_LOOP, eventloop); - BufferServerPublisher oss = new BufferServerPublisher(upstreamNodeId, 1024); + oss = new BufferServerPublisher(upstreamNodeId, 1024); oss.setup(ossContext); + } - iss.activate(issContext); - LOG.debug("input stream activated"); - - oss.activate(ossContext); - LOG.debug("output stream activated"); - - LOG.debug("Sending hello message"); - oss.put(StramTestSupport.generateBeginWindowTuple(upstreamNodeId, 0)); - oss.put(StramTestSupport.generateTuple("hello", 0)); - oss.put(StramTestSupport.generateEndWindowTuple(upstreamNodeId, 0)); - oss.put(StramTestSupport.generateBeginWindowTuple(upstreamNodeId, 1)); // it's a spurious tuple, presence of it should not affect the outcome of the test. - + @After + public void verify() throws InterruptedException + { for (int i = 0; i < 100; i++) { Tuple t = reservoir.sweep(); if (t == null) { @@ -167,5 +213,14 @@ public class SocketStreamTest Assert.assertEquals("Received messages", 1, messageCount.get()); } + private void sendMessage() + { + LOG.debug("Sending hello message"); + oss.put(StramTestSupport.generateBeginWindowTuple(upstreamNodeId, 0)); + oss.put(StramTestSupport.generateTuple("hello", 0)); + oss.put(StramTestSupport.generateEndWindowTuple(upstreamNodeId, 0)); + oss.put(StramTestSupport.generateBeginWindowTuple(upstreamNodeId, 1)); // it's a spurious tuple, presence of it should not affect the outcome of the test. + } + private static final Logger logger = LoggerFactory.getLogger(SocketStreamTest.class); }
