Repository: apex-core
Updated Branches:
  refs/heads/master b4a4e0517 -> c42f26e01


APEXCORE-597 BufferServer needs to shutdown all created execution services


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/075dd483
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/075dd483
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/075dd483

Branch: refs/heads/master
Commit: 075dd483af53e0eb16bd53df6108fcfd8df102e0
Parents: 3c063a4
Author: Vlad Rozov <[email protected]>
Authored: Sun Feb 12 21:17:34 2017 -0800
Committer: Vlad Rozov <[email protected]>
Committed: Wed Mar 22 09:14:12 2017 -0700

----------------------------------------------------------------------
 .../datatorrent/bufferserver/server/Server.java | 126 +++++++++++++++----
 .../bufferserver/client/SubscriberTest.java     |   6 +-
 .../bufferserver/server/ServerTest.java         |   6 +-
 .../bufferserver/storage/DiskStorageTest.java   |   6 +-
 .../datatorrent/stram/StramLocalCluster.java    |   6 +-
 .../stram/engine/StreamingContainer.java        |   8 +-
 .../stram/engine/GenericNodeTest.java           |   6 +-
 .../stram/stream/FastStreamTest.java            |   6 +-
 .../stram/stream/SocketStreamTest.java          |   6 +-
 9 files changed, 126 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java 
b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index e0fe704..7ac518b 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
@@ -50,9 +51,9 @@ import com.datatorrent.bufferserver.packet.Tuple;
 import com.datatorrent.bufferserver.storage.Storage;
 import com.datatorrent.common.util.NameableThreadFactory;
 import com.datatorrent.netlet.AbstractLengthPrependerClient;
+import com.datatorrent.netlet.AbstractServer;
 import com.datatorrent.netlet.DefaultEventLoop;
 import com.datatorrent.netlet.EventLoop;
-import com.datatorrent.netlet.Listener.ServerListener;
 import com.datatorrent.netlet.WriteOnlyLengthPrependerClient;
 import com.datatorrent.netlet.util.VarInt;
 
@@ -62,17 +63,17 @@ import com.datatorrent.netlet.util.VarInt;
  *
  * @since 0.3.2
  */
-public class Server implements ServerListener
+public class Server extends AbstractServer
 {
   public static final int DEFAULT_BUFFER_SIZE = 64 * 1024 * 1024;
   public static final int DEFAULT_NUMBER_OF_CACHED_BLOCKS = 8;
   private final int port;
   private String identity;
   private Storage storage;
-  private EventLoop eventloop;
-  private InetSocketAddress address;
+  private final EventLoop eventloop;
   private final ExecutorService serverHelperExecutor;
   private final ExecutorService storageHelperExecutor;
+  private volatile CountDownLatch latch;
 
   private byte[] authToken;
 
@@ -81,13 +82,14 @@ public class Server implements ServerListener
   /**
    * @param port - port number to bind to or 0 to auto select a free port
    */
-  public Server(int port)
+  public Server(EventLoop eventloop, int port)
   {
-    this(port, DEFAULT_BUFFER_SIZE, DEFAULT_NUMBER_OF_CACHED_BLOCKS);
+    this(eventloop, port, DEFAULT_BUFFER_SIZE, 
DEFAULT_NUMBER_OF_CACHED_BLOCKS);
   }
 
-  public Server(int port, int blocksize, int numberOfCacheBlocks)
+  public Server(EventLoop eventloop, int port, int blocksize, int 
numberOfCacheBlocks)
   {
+    this.eventloop = eventloop;
     this.port = port;
     this.blockSize = blocksize;
     this.numberOfCacheBlocks = numberOfCacheBlocks;
@@ -104,12 +106,12 @@ public class Server implements ServerListener
   }
 
   @Override
-  public synchronized void registered(SelectionKey key)
+  public void registered(SelectionKey key)
   {
-    ServerSocketChannel channel = (ServerSocketChannel)key.channel();
-    address = (InetSocketAddress)channel.socket().getLocalSocketAddress();
-    logger.info("Server started listening at {}", address);
-    notifyAll();
+    super.registered(key);
+    logger.info("Server started listening at {}", getServerAddress());
+    latch.countDown();
+    latch = null;
   }
 
   @Override
@@ -119,7 +121,7 @@ public class Server implements ServerListener
       ln.boot();
     }
     /*
-     * There may be unregister tasks scheduled to run on the event loop that 
use serverHelperExecutor.
+     * There may be un-register tasks scheduled to run on the event loop that 
use serverHelperExecutor.
      */
     eventloop.submit(new Runnable()
     {
@@ -130,27 +132,100 @@ public class Server implements ServerListener
         storageHelperExecutor.shutdown();
         try {
           serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
+          storageHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
         } catch (InterruptedException ex) {
           logger.debug("Executor Termination", ex);
         }
-        logger.info("Server stopped listening at {}", address);
+        logger.info("Server stopped listening at {}", getServerAddress());
+        latch.countDown();
+        latch = null;
       }
     });
   }
 
-  public synchronized InetSocketAddress run(EventLoop eventloop)
+  public InetSocketAddress run()
   {
+    final CountDownLatch latch = new CountDownLatch(1);
+    this.latch = latch;
     eventloop.start(null, port, this);
-    while (address == null) {
-      try {
-        wait(20);
-      } catch (InterruptedException ex) {
-        throw new RuntimeException(ex);
+    try {
+      latch.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+    return (InetSocketAddress)getServerAddress();
+  }
+
+  public InetSocketAddress run(long time)
+  {
+    if (time < 0) {
+      throw new IllegalArgumentException(String.format("Wait time %d can't be 
negative", time));
+    }
+    final CountDownLatch latch = new CountDownLatch(1);
+    this.latch = latch;
+    eventloop.start(null, port, this);
+    final long deadline = System.currentTimeMillis() + time;
+    try {
+      while (latch.getCount() != 0 && time > 0 && latch.await(time, 
TimeUnit.MILLISECONDS)) {
+        time = deadline - System.currentTimeMillis();
       }
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
     }
+    return (InetSocketAddress)getServerAddress();
+  }
 
-    this.eventloop = eventloop;
-    return address;
+  public void stop()
+  {
+    final CountDownLatch latch = new CountDownLatch(1);
+    this.latch = latch;
+    eventloop.stop(this);
+    try {
+      latch.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      shutdownExecutors(latch.getCount() == 0);
+    }
+  }
+
+  public void stop(long time)
+  {
+    if (time < 0) {
+      throw new IllegalArgumentException(String.format("Wait time %d can't be 
negative", time));
+    }
+    final CountDownLatch latch = new CountDownLatch(1);
+    this.latch = latch;
+    eventloop.stop(this);
+    final long deadline = System.currentTimeMillis() + time;
+    try {
+      while (latch.getCount() != 0 && time > 0 && latch.await(time, 
TimeUnit.MILLISECONDS)) {
+        time = deadline - System.currentTimeMillis();
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      shutdownExecutors(latch.getCount() == 0);
+    }
+  }
+
+  private void shutdownExecutors(boolean isTerminated)
+  {
+    if (!isTerminated) {
+      logger.warn("Buffer server {} did not terminate.", this);
+      try {
+        if (!serverHelperExecutor.isTerminated()) {
+          logger.warn("Forcing termination of {}", serverHelperExecutor);
+          serverHelperExecutor.shutdownNow();
+        }
+        if (!storageHelperExecutor.isTerminated()) {
+          logger.warn("Forcing termination of {}", storageHelperExecutor);
+          storageHelperExecutor.shutdownNow();
+        }
+      } catch (RuntimeException e) {
+        logger.error("Exception while terminating executors", e);
+      }
+    }
   }
 
   public void setAuthToken(byte[] authToken)
@@ -173,14 +248,15 @@ public class Server implements ServerListener
     }
 
     DefaultEventLoop eventloop = DefaultEventLoop.createEventLoop("alone");
-    eventloop.start(null, port, new Server(port));
-    new Thread(eventloop).start();
+    Thread thread = eventloop.start();
+    new Server(eventloop, port).run();
+    thread.join();
   }
 
   @Override
   public String toString()
   {
-    return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) 
+ "{address=" + address + "}";
+    return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) 
+ "{address=" + getServerAddress() + "}";
   }
 
   private final ConcurrentHashMap<String, DataList> publisherBuffers = new 
ConcurrentHashMap<>(1, 0.75f, 1);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
 
b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
index 234fb12..267aaa7 100644
--- 
a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
+++ 
b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java
@@ -66,8 +66,8 @@ public class SubscriberTest
     eventloopServer.start();
     eventloopClient.start();
 
-    instance = new Server(0, 64, 2);
-    address = instance.run(eventloopServer);
+    instance = new Server(eventloopServer, 0, 64, 2);
+    address = instance.run();
     assertTrue(address instanceof InetSocketAddress);
     assertFalse(address.isUnresolved());
   }
@@ -75,7 +75,7 @@ public class SubscriberTest
   @AfterClass
   public static void teardownServerAndClients()
   {
-    eventloopServer.stop(instance);
+    instance.stop();
     eventloopServer.stop();
     eventloopClient.stop();
   }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
 
b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
index b7d8de1..45e4147 100644
--- 
a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
+++ 
b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
@@ -70,8 +70,8 @@ public class ServerTest
     eventloopServer.start();
     eventloopClient.start();
 
-    instance = new Server(0, 4096,8);
-    address = instance.run(eventloopServer);
+    instance = new Server(eventloopServer, 0, 4096,8);
+    address = instance.run();
     assertTrue(address instanceof InetSocketAddress);
     assertFalse(address.isUnresolved());
 
@@ -83,7 +83,7 @@ public class ServerTest
   @AfterClass
   public static void teardownServerAndClients()
   {
-    eventloopServer.stop(instance);
+    instance.stop();
     eventloopServer.stop();
   }
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
----------------------------------------------------------------------
diff --git 
a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
 
b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
index 86696f4..21168dd 100644
--- 
a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
+++ 
b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java
@@ -60,10 +60,10 @@ public class DiskStorageTest
     eventloopClient = DefaultEventLoop.createEventLoop("client");
     eventloopClient.start();
 
-    instance = new Server(0, 1024, 8);
+    instance = new Server(eventloopServer, 0, 1024, 8);
     instance.setSpoolStorage(new DiskStorage());
 
-    address = instance.run(eventloopServer);
+    address = instance.run();
     assertFalse(address.isUnresolved());
 
     bsp = new Publisher("MyPublisher");
@@ -79,7 +79,7 @@ public class DiskStorageTest
   @AfterClass
   public static void teardownServerAndClients()
   {
-    eventloopServer.stop(instance);
+    instance.stop();
     eventloopClient.stop();
     eventloopServer.stop();
   }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java 
b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index 3ea969e..cfbd047 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -465,13 +465,13 @@ public class StramLocalCluster implements Runnable, 
Controller
   {
     if (!perContainerBufferServer) {
       StreamingContainer.eventloop.start();
-      bufferServer = new Server(0, 1024 * 1024, 8);
+      bufferServer = new Server(StreamingContainer.eventloop, 0, 1024 * 1024, 
8);
       try {
         bufferServer.setSpoolStorage(new DiskStorage());
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
-      bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, 
bufferServer.run(StreamingContainer.eventloop).getPort());
+      bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, 
bufferServer.run().getPort());
       LOG.info("Buffer server started: {}", bufferServerAddress);
     }
 
@@ -557,7 +557,7 @@ public class StramLocalCluster implements Runnable, 
Controller
 
     LOG.info("Application finished.");
     if (!perContainerBufferServer) {
-      StreamingContainer.eventloop.stop(bufferServer);
+      bufferServer.stop();
       StreamingContainer.eventloop.stop();
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java 
b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 437070c..e1e2ce8 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -155,7 +155,7 @@ public class StreamingContainer extends YarnContainerMain
   private long firstWindowMillis;
   private int windowWidthMillis;
   protected InetSocketAddress bufferServerAddress;
-  protected com.datatorrent.bufferserver.server.Server bufferServer;
+  protected Server bufferServer;
   private int checkpointWindowCount;
   private boolean fastPublisherSubscriber;
   private StreamingContainerContext containerContext;
@@ -224,12 +224,12 @@ public class StreamingContainer extends YarnContainerMain
           blockCount = bufferServerRAM / blocksize;
         }
         // start buffer server, if it was not set externally
-        bufferServer = new Server(0, blocksize * 1024 * 1024, blockCount);
+        bufferServer = new Server(eventloop, 0, blocksize * 1024 * 1024, 
blockCount);
         
bufferServer.setAuthToken(ctx.getValue(StreamingContainerContext.BUFFER_SERVER_TOKEN));
         if (ctx.getValue(Context.DAGContext.BUFFER_SPOOLING)) {
           bufferServer.setSpoolStorage(new DiskStorage());
         }
-        bufferServerAddress = 
NetUtils.getConnectAddress(bufferServer.run(eventloop));
+        bufferServerAddress = NetUtils.getConnectAddress(bufferServer.run());
         logger.debug("Buffer server started: {}", bufferServerAddress);
       }
     } catch (IOException ex) {
@@ -588,7 +588,7 @@ public class StreamingContainer extends YarnContainerMain
     }
 
     if (bufferServer != null) {
-      eventloop.stop(bufferServer);
+      bufferServer.stop();
       eventloop.stop();
     }
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index 7d39e34..da5c7b7 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -373,8 +373,8 @@ public class GenericNodeTest
     EventLoop eventloop = 
DefaultEventLoop.createEventLoop("StreamTestEventLoop");
 
     ((DefaultEventLoop)eventloop).start();
-    final Server bufferServer = new Server(0); // find random port
-    final int bufferServerPort = bufferServer.run(eventloop).getPort();
+    final Server bufferServer = new Server(eventloop, 0); // find random port
+    final int bufferServerPort = bufferServer.run().getPort();
 
     final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
     final BlockingQueue<Object> tuples = new ArrayBlockingQueue<Object>(10);
@@ -478,7 +478,7 @@ public class GenericNodeTest
     Assert.assertEquals("Payload Tuple 3", 3, ((byte[])list.get(7))[5]);
 
     if (bufferServer != null) {
-      eventloop.stop(bufferServer);
+      bufferServer.stop();
     }
 
     ((DefaultEventLoop)eventloop).stop();

http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java 
b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
index 2d940fa..fd67121 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java
@@ -70,8 +70,8 @@ public class FastStreamTest
   public static void setup() throws InterruptedException, IOException, 
Exception
   {
     ((DefaultEventLoop)eventloop).start();
-    bufferServer = new Server(0); // find random port
-    InetSocketAddress bindAddr = bufferServer.run(eventloop);
+    bufferServer = new Server(eventloop, 0); // find random port
+    InetSocketAddress bindAddr = bufferServer.run();
     bufferServerPort = bindAddr.getPort();
   }
 
@@ -79,7 +79,7 @@ public class FastStreamTest
   public static void tearDown() throws IOException
   {
     if (bufferServer != null) {
-      eventloop.stop(bufferServer);
+      bufferServer.stop();
     }
     ((DefaultEventLoop)eventloop).stop();
   }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/075dd483/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java 
b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
index 4094f66..38460ea 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
@@ -81,8 +81,8 @@ public class SocketStreamTest
   public static void setup() throws InterruptedException, IOException, 
Exception
   {
     ((DefaultEventLoop)eventloop).start();
-    bufferServer = new Server(0); // find random port
-    InetSocketAddress bindAddr = bufferServer.run(eventloop);
+    bufferServer = new Server(eventloop, 0); // find random port
+    InetSocketAddress bindAddr = bufferServer.run();
     bufferServerPort = bindAddr.getPort();
   }
 
@@ -90,7 +90,7 @@ public class SocketStreamTest
   public static void tearDown() throws IOException
   {
     if (bufferServer != null) {
-      eventloop.stop(bufferServer);
+      bufferServer.stop();
     }
     ((DefaultEventLoop)eventloop).stop();
   }

Reply via email to