Repository: apex-core Updated Branches: refs/heads/master b4a4e0517 -> c42f26e01
APEXCORE-597 BufferServer needs to shutdown all created execution services Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/075dd483 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/075dd483 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/075dd483 Branch: refs/heads/master Commit: 075dd483af53e0eb16bd53df6108fcfd8df102e0 Parents: 3c063a4 Author: Vlad Rozov <[email protected]> Authored: Sun Feb 12 21:17:34 2017 -0800 Committer: Vlad Rozov <[email protected]> Committed: Wed Mar 22 09:14:12 2017 -0700 ---------------------------------------------------------------------- .../datatorrent/bufferserver/server/Server.java | 126 +++++++++++++++---- .../bufferserver/client/SubscriberTest.java | 6 +- .../bufferserver/server/ServerTest.java | 6 +- .../bufferserver/storage/DiskStorageTest.java | 6 +- .../datatorrent/stram/StramLocalCluster.java | 6 +- .../stram/engine/StreamingContainer.java | 8 +- .../stram/engine/GenericNodeTest.java | 6 +- .../stram/stream/FastStreamTest.java | 6 +- .../stram/stream/SocketStreamTest.java | 6 +- 9 files changed, 126 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index e0fe704..7ac518b 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; @@ -50,9 +51,9 @@ import com.datatorrent.bufferserver.packet.Tuple; import com.datatorrent.bufferserver.storage.Storage; import com.datatorrent.common.util.NameableThreadFactory; import com.datatorrent.netlet.AbstractLengthPrependerClient; +import com.datatorrent.netlet.AbstractServer; import com.datatorrent.netlet.DefaultEventLoop; import com.datatorrent.netlet.EventLoop; -import com.datatorrent.netlet.Listener.ServerListener; import com.datatorrent.netlet.WriteOnlyLengthPrependerClient; import com.datatorrent.netlet.util.VarInt; @@ -62,17 +63,17 @@ import com.datatorrent.netlet.util.VarInt; * * @since 0.3.2 */ -public class Server implements ServerListener +public class Server extends AbstractServer { public static final int DEFAULT_BUFFER_SIZE = 64 * 1024 * 1024; public static final int DEFAULT_NUMBER_OF_CACHED_BLOCKS = 8; private final int port; private String identity; private Storage storage; - private EventLoop eventloop; - private InetSocketAddress address; + private final EventLoop eventloop; private final ExecutorService serverHelperExecutor; private final ExecutorService storageHelperExecutor; + private volatile CountDownLatch latch; private byte[] authToken; @@ -81,13 +82,14 @@ public class Server implements ServerListener /** * @param port - port number to bind to or 0 to auto select a free port */ - public Server(int port) + public Server(EventLoop eventloop, int port) { - this(port, DEFAULT_BUFFER_SIZE, DEFAULT_NUMBER_OF_CACHED_BLOCKS); + this(eventloop, port, DEFAULT_BUFFER_SIZE, DEFAULT_NUMBER_OF_CACHED_BLOCKS); } - public Server(int port, int blocksize, int numberOfCacheBlocks) + public Server(EventLoop eventloop, int port, int blocksize, int numberOfCacheBlocks) { + this.eventloop = eventloop; this.port = port; this.blockSize = blocksize; this.numberOfCacheBlocks = numberOfCacheBlocks; @@ -104,12 +106,12 @@ public class Server implements ServerListener } @Override - public synchronized void registered(SelectionKey key) + public void registered(SelectionKey key) { - ServerSocketChannel channel = (ServerSocketChannel)key.channel(); - address = (InetSocketAddress)channel.socket().getLocalSocketAddress(); - logger.info("Server started listening at {}", address); - notifyAll(); + super.registered(key); + logger.info("Server started listening at {}", getServerAddress()); + latch.countDown(); + latch = null; } @Override @@ -119,7 +121,7 @@ public class Server implements ServerListener ln.boot(); } /* - * There may be unregister tasks scheduled to run on the event loop that use serverHelperExecutor. + * There may be un-register tasks scheduled to run on the event loop that use serverHelperExecutor. */ eventloop.submit(new Runnable() { @@ -130,27 +132,100 @@ public class Server implements ServerListener storageHelperExecutor.shutdown(); try { serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); + storageHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { logger.debug("Executor Termination", ex); } - logger.info("Server stopped listening at {}", address); + logger.info("Server stopped listening at {}", getServerAddress()); + latch.countDown(); + latch = null; } }); } - public synchronized InetSocketAddress run(EventLoop eventloop) + public InetSocketAddress run() { + final CountDownLatch latch = new CountDownLatch(1); + this.latch = latch; eventloop.start(null, port, this); - while (address == null) { - try { - wait(20); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return (InetSocketAddress)getServerAddress(); + } + + public InetSocketAddress run(long time) + { + if (time < 0) { + throw new IllegalArgumentException(String.format("Wait time %d can't be negative", time)); + } + final CountDownLatch latch = new CountDownLatch(1); + this.latch = latch; + eventloop.start(null, port, this); + final long deadline = System.currentTimeMillis() + time; + try { + while (latch.getCount() != 0 && time > 0 && latch.await(time, TimeUnit.MILLISECONDS)) { + time = deadline - System.currentTimeMillis(); } + } catch (InterruptedException e) { + throw new RuntimeException(e); } + return (InetSocketAddress)getServerAddress(); + } - this.eventloop = eventloop; - return address; + public void stop() + { + final CountDownLatch latch = new CountDownLatch(1); + this.latch = latch; + eventloop.stop(this); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + shutdownExecutors(latch.getCount() == 0); + } + } + + public void stop(long time) + { + if (time < 0) { + throw new IllegalArgumentException(String.format("Wait time %d can't be negative", time)); + } + final CountDownLatch latch = new CountDownLatch(1); + this.latch = latch; + eventloop.stop(this); + final long deadline = System.currentTimeMillis() + time; + try { + while (latch.getCount() != 0 && time > 0 && latch.await(time, TimeUnit.MILLISECONDS)) { + time = deadline - System.currentTimeMillis(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + shutdownExecutors(latch.getCount() == 0); + } + } + + private void shutdownExecutors(boolean isTerminated) + { + if (!isTerminated) { + logger.warn("Buffer server {} did not terminate.", this); + try { + if (!serverHelperExecutor.isTerminated()) { + logger.warn("Forcing termination of {}", serverHelperExecutor); + serverHelperExecutor.shutdownNow(); + } + if (!storageHelperExecutor.isTerminated()) { + logger.warn("Forcing termination of {}", storageHelperExecutor); + storageHelperExecutor.shutdownNow(); + } + } catch (RuntimeException e) { + logger.error("Exception while terminating executors", e); + } + } } public void setAuthToken(byte[] authToken) @@ -173,14 +248,15 @@ public class Server implements ServerListener } DefaultEventLoop eventloop = DefaultEventLoop.createEventLoop("alone"); - eventloop.start(null, port, new Server(port)); - new Thread(eventloop).start(); + Thread thread = eventloop.start(); + new Server(eventloop, port).run(); + thread.join(); } @Override public String toString() { - return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + "{address=" + address + "}"; + return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + "{address=" + getServerAddress() + "}"; } private final ConcurrentHashMap<String, DataList> publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1); http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java index 234fb12..267aaa7 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java @@ -66,8 +66,8 @@ public class SubscriberTest eventloopServer.start(); eventloopClient.start(); - instance = new Server(0, 64, 2); - address = instance.run(eventloopServer); + instance = new Server(eventloopServer, 0, 64, 2); + address = instance.run(); assertTrue(address instanceof InetSocketAddress); assertFalse(address.isUnresolved()); } @@ -75,7 +75,7 @@ public class SubscriberTest @AfterClass public static void teardownServerAndClients() { - eventloopServer.stop(instance); + instance.stop(); eventloopServer.stop(); eventloopClient.stop(); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java index b7d8de1..45e4147 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java @@ -70,8 +70,8 @@ public class ServerTest eventloopServer.start(); eventloopClient.start(); - instance = new Server(0, 4096,8); - address = instance.run(eventloopServer); + instance = new Server(eventloopServer, 0, 4096,8); + address = instance.run(); assertTrue(address instanceof InetSocketAddress); assertFalse(address.isUnresolved()); @@ -83,7 +83,7 @@ public class ServerTest @AfterClass public static void teardownServerAndClients() { - eventloopServer.stop(instance); + instance.stop(); eventloopServer.stop(); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java index 86696f4..21168dd 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java @@ -60,10 +60,10 @@ public class DiskStorageTest eventloopClient = DefaultEventLoop.createEventLoop("client"); eventloopClient.start(); - instance = new Server(0, 1024, 8); + instance = new Server(eventloopServer, 0, 1024, 8); instance.setSpoolStorage(new DiskStorage()); - address = instance.run(eventloopServer); + address = instance.run(); assertFalse(address.isUnresolved()); bsp = new Publisher("MyPublisher"); @@ -79,7 +79,7 @@ public class DiskStorageTest @AfterClass public static void teardownServerAndClients() { - eventloopServer.stop(instance); + instance.stop(); eventloopClient.stop(); eventloopServer.stop(); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java index 3ea969e..cfbd047 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java +++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java @@ -465,13 +465,13 @@ public class StramLocalCluster implements Runnable, Controller { if (!perContainerBufferServer) { StreamingContainer.eventloop.start(); - bufferServer = new Server(0, 1024 * 1024, 8); + bufferServer = new Server(StreamingContainer.eventloop, 0, 1024 * 1024, 8); try { bufferServer.setSpoolStorage(new DiskStorage()); } catch (IOException e) { throw new RuntimeException(e); } - bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServer.run(StreamingContainer.eventloop).getPort()); + bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServer.run().getPort()); LOG.info("Buffer server started: {}", bufferServerAddress); } @@ -557,7 +557,7 @@ public class StramLocalCluster implements Runnable, Controller LOG.info("Application finished."); if (!perContainerBufferServer) { - StreamingContainer.eventloop.stop(bufferServer); + bufferServer.stop(); StreamingContainer.eventloop.stop(); } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index 437070c..e1e2ce8 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -155,7 +155,7 @@ public class StreamingContainer extends YarnContainerMain private long firstWindowMillis; private int windowWidthMillis; protected InetSocketAddress bufferServerAddress; - protected com.datatorrent.bufferserver.server.Server bufferServer; + protected Server bufferServer; private int checkpointWindowCount; private boolean fastPublisherSubscriber; private StreamingContainerContext containerContext; @@ -224,12 +224,12 @@ public class StreamingContainer extends YarnContainerMain blockCount = bufferServerRAM / blocksize; } // start buffer server, if it was not set externally - bufferServer = new Server(0, blocksize * 1024 * 1024, blockCount); + bufferServer = new Server(eventloop, 0, blocksize * 1024 * 1024, blockCount); bufferServer.setAuthToken(ctx.getValue(StreamingContainerContext.BUFFER_SERVER_TOKEN)); if (ctx.getValue(Context.DAGContext.BUFFER_SPOOLING)) { bufferServer.setSpoolStorage(new DiskStorage()); } - bufferServerAddress = NetUtils.getConnectAddress(bufferServer.run(eventloop)); + bufferServerAddress = NetUtils.getConnectAddress(bufferServer.run()); logger.debug("Buffer server started: {}", bufferServerAddress); } } catch (IOException ex) { @@ -588,7 +588,7 @@ public class StreamingContainer extends YarnContainerMain } if (bufferServer != null) { - eventloop.stop(bufferServer); + bufferServer.stop(); eventloop.stop(); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java index 7d39e34..da5c7b7 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java @@ -373,8 +373,8 @@ public class GenericNodeTest EventLoop eventloop = DefaultEventLoop.createEventLoop("StreamTestEventLoop"); ((DefaultEventLoop)eventloop).start(); - final Server bufferServer = new Server(0); // find random port - final int bufferServerPort = bufferServer.run(eventloop).getPort(); + final Server bufferServer = new Server(eventloop, 0); // find random port + final int bufferServerPort = bufferServer.run().getPort(); final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>(); final BlockingQueue<Object> tuples = new ArrayBlockingQueue<Object>(10); @@ -478,7 +478,7 @@ public class GenericNodeTest Assert.assertEquals("Payload Tuple 3", 3, ((byte[])list.get(7))[5]); if (bufferServer != null) { - eventloop.stop(bufferServer); + bufferServer.stop(); } ((DefaultEventLoop)eventloop).stop(); http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java index 2d940fa..fd67121 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java @@ -70,8 +70,8 @@ public class FastStreamTest public static void setup() throws InterruptedException, IOException, Exception { ((DefaultEventLoop)eventloop).start(); - bufferServer = new Server(0); // find random port - InetSocketAddress bindAddr = bufferServer.run(eventloop); + bufferServer = new Server(eventloop, 0); // find random port + InetSocketAddress bindAddr = bufferServer.run(); bufferServerPort = bindAddr.getPort(); } @@ -79,7 +79,7 @@ public class FastStreamTest public static void tearDown() throws IOException { if (bufferServer != null) { - eventloop.stop(bufferServer); + bufferServer.stop(); } ((DefaultEventLoop)eventloop).stop(); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java index 4094f66..38460ea 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java @@ -81,8 +81,8 @@ public class SocketStreamTest public static void setup() throws InterruptedException, IOException, Exception { ((DefaultEventLoop)eventloop).start(); - bufferServer = new Server(0); // find random port - InetSocketAddress bindAddr = bufferServer.run(eventloop); + bufferServer = new Server(eventloop, 0); // find random port + InetSocketAddress bindAddr = bufferServer.run(); bufferServerPort = bindAddr.getPort(); } @@ -90,7 +90,7 @@ public class SocketStreamTest public static void tearDown() throws IOException { if (bufferServer != null) { - eventloop.stop(bufferServer); + bufferServer.stop(); } ((DefaultEventLoop)eventloop).stop(); }
