Repository: geode
Updated Branches:
  refs/heads/feature/GEODE-3416 [created] 56d7707e1


GEODE-3416: Reduce synchronization blockages in SocketCloser.
Remove synchronization blocks around HashMap. Replace that implementation
with simpler ThreadPool that is not unbounded and does not grow as the
number of remoteAddress (clients/peers) are added


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/56d7707e
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/56d7707e
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/56d7707e

Branch: refs/heads/feature/GEODE-3416
Commit: 56d7707e12ecdecbc1881a9ac1575fac854e8fe9
Parents: b06f69f
Author: Udo Kohlmeyer <ukohlme...@pivotal.io>
Authored: Wed Aug 9 14:17:59 2017 -0700
Committer: Udo Kohlmeyer <ukohlme...@pivotal.io>
Committed: Wed Aug 9 14:17:59 2017 -0700

----------------------------------------------------------------------
 .../cache/tier/sockets/CacheClientProxy.java    |  51 +-----
 .../apache/geode/internal/net/SocketCloser.java | 180 +++++++------------
 .../apache/geode/internal/tcp/Connection.java   |   4 +-
 .../geode/internal/tcp/ConnectionTable.java     |   4 -
 .../internal/net/SocketCloserJUnitTest.java     | 155 +++++-----------
 5 files changed, 119 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/56d7707e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index d7e3548..98bfed9 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -181,11 +181,7 @@ public class CacheClientProxy implements ClientSession {
    * True if we are connected to a client.
    */
   private volatile boolean connected = false;
-  // /**
-  // * A string representing interest in all keys
-  // */
-  // protected static final String ALL_KEYS = "ALL_KEYS";
-  //
+
   /**
    * True if a marker message is still in the ha queue.
    */
@@ -459,47 +455,6 @@ public class CacheClientProxy implements ClientSession {
     return this.proxyID;
   }
 
-  // the following code was commented out simply because it was not used
-  // /**
-  // * Determines if the proxy represents the client host (and only the host, 
not
-  // * necessarily the exact VM running on the host)
-  // *
-  // * @return Whether the proxy represents the client host
-  // */
-  // protected boolean representsClientHost(String clientHost)
-  // {
-  // // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings
-  // return this._remoteHostAddress.equals(clientHost);
-  // }
-
-  // protected boolean representsClientVM(DistributedMember remoteMember)
-  // {
-  // // logger.warn("Is input port " + clientPort + " contained in " +
-  // // logger.warn("Does input host " + clientHost + " equal " +
-  // // this._remoteHostAddress+ ": " + representsClientHost(clientHost));
-  // // logger.warn("representsClientVM: " +
-  // // (representsClientHost(clientHost) && containsPort(clientPort)));
-  // return (proxyID.getDistributedMember().equals(remoteMember));
-  // }
-
-  // /**
-  // * Determines if the CacheClientUpdater proxied by this instance is 
listening
-  // * on the input clientHost and clientPort
-  // *
-  // * @param clientHost
-  // * The host name of the client to compare
-  // * @param clientPort
-  // * The port number of the client to compare
-  // *
-  // * @return Whether the CacheClientUpdater proxied by this instance is
-  // * listening on the input clientHost and clientPort
-  // */
-  // protected boolean representsCacheClientUpdater(String clientHost,
-  // int clientPort)
-  // {
-  // return (clientPort == this._socket.getPort() && 
representsClientHost(clientHost));
-  // }
-
   protected boolean isMember(ClientProxyMembershipID memberId) {
     return this.proxyID.equals(memberId);
   }
@@ -994,8 +949,7 @@ public class CacheClientProxy implements ClientSession {
   private void closeSocket() {
     if (this._socketClosed.compareAndSet(false, true)) {
       // Close the socket
-      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, 
this._remoteHostAddress,
-          null);
+      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, 
null);
       getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
     }
   }
@@ -1009,7 +963,6 @@ public class CacheClientProxy implements ClientSession {
     {
       String remoteHostAddress = this._remoteHostAddress;
       if (remoteHostAddress != null) {
-        
this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
         this._remoteHostAddress = null;
       }
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/56d7707e/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java 
b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
index 6d86fd8..d1f5aa4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
@@ -14,11 +14,15 @@
  */
 package org.apache.geode.internal.net;
 
+import org.apache.geode.SystemFailure;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.LoggingThreadGroup;
+import org.apache.logging.log4j.Logger;
+
 import java.io.IOException;
 import java.net.Socket;
-import java.util.HashMap;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -26,12 +30,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.SystemFailure;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.LoggingThreadGroup;
-
 /**
  * This class allows sockets to be closed without blocking. In some cases we 
have seen a call of
  * socket.close block for minutes. This class maintains a thread pool for 
every other member we have
@@ -51,28 +49,31 @@ public class SocketCloser {
    * minutes).
    */
   static final long ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS =
-      Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120).longValue();
+      Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120);
   /**
    * Maximum number of threads that can be doing a socket close. Any close 
requests over this max
    * will queue up waiting for a thread.
    */
-  static final int ASYNC_CLOSE_POOL_MAX_THREADS =
-      Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8).intValue();
+  private static final int ASYNC_CLOSE_POOL_MAX_THREADS =
+      Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8);
   /**
    * How many milliseconds the synchronous requester waits for the async close 
to happen. Default is
    * 0. Prior releases waited 50ms.
    */
-  static final long ASYNC_CLOSE_WAIT_MILLISECONDS =
-      Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0).longValue();
+  private static final long ASYNC_CLOSE_WAIT_MILLISECONDS =
+      Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0);
 
 
-  /** map of thread pools of async close threads */
-  private final HashMap<String, ThreadPoolExecutor> asyncCloseExecutors = new 
HashMap<>();
+  /**
+   * map of thread pools of async close threads
+   */
+
   private final long asyncClosePoolKeepAliveSeconds;
   private final int asyncClosePoolMaxThreads;
   private final long asyncCloseWaitTime;
   private final TimeUnit asyncCloseWaitUnits;
   private boolean closed;
+  private final ExecutorService socketCloserThreadPool;
 
   public SocketCloser() {
     this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS,
@@ -90,53 +91,25 @@ public class SocketCloser {
     this.asyncClosePoolMaxThreads = asyncClosePoolMaxThreads;
     this.asyncCloseWaitTime = asyncCloseWaitTime;
     this.asyncCloseWaitUnits = asyncCloseWaitUnits;
+
+    final ThreadGroup threadGroup =
+        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
+    ThreadFactory threadFactory = command -> {
+      Thread thread = new Thread(threadGroup, command);
+      thread.setDaemon(true);
+      return thread;
+    };
+    socketCloserThreadPool = new 
ThreadPoolExecutor(this.asyncClosePoolMaxThreads,
+        this.asyncClosePoolMaxThreads, this.asyncClosePoolKeepAliveSeconds, 
TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(), threadFactory);
   }
 
   public int getMaxThreads() {
     return this.asyncClosePoolMaxThreads;
   }
 
-  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
-    synchronized (asyncCloseExecutors) {
-      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
-      if (pool == null) {
-        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket 
asyncClose", logger);
-        ThreadFactory tf = new ThreadFactory() {
-          public Thread newThread(final Runnable command) {
-            Thread thread = new Thread(tg, command);
-            thread.setDaemon(true);
-            return thread;
-          }
-        };
-        BlockingQueue<Runnable> workQueue = new 
LinkedBlockingQueue<Runnable>();
-        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, 
this.asyncClosePoolMaxThreads,
-            this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, 
tf);
-        pool.allowCoreThreadTimeOut(true);
-        asyncCloseExecutors.put(address, pool);
-      }
-      return pool;
-    }
-  }
-
-  /**
-   * Call this method if you know all the resources in the closer for the 
given address are no
-   * longer needed. Currently a thread pool is kept for each address and if 
you know that an address
-   * no longer needs its pool then you should call this method.
-   */
-  public void releaseResourcesForAddress(String address) {
-    synchronized (asyncCloseExecutors) {
-      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
-      if (pool != null) {
-        pool.shutdown();
-        asyncCloseExecutors.remove(address);
-      }
-    }
-  }
-
   private boolean isClosed() {
-    synchronized (asyncCloseExecutors) {
-      return this.closed;
-    }
+    return this.closed;
   }
 
   /**
@@ -144,34 +117,9 @@ public class SocketCloser {
    * called then the asyncClose will be done synchronously.
    */
   public void close() {
-    synchronized (asyncCloseExecutors) {
-      if (!this.closed) {
-        this.closed = true;
-        for (ThreadPoolExecutor pool : asyncCloseExecutors.values()) {
-          pool.shutdown();
-        }
-        asyncCloseExecutors.clear();
-      }
-    }
-  }
-
-  private void asyncExecute(String address, Runnable r) {
-    // Waiting 50ms for the async close request to complete is what the old 
(close per thread)
-    // code did. But now that we will not create a thread for every close 
request
-    // it seems better to let the thread that requested the close to move on 
quickly.
-    // So the default has changed to not wait. The system property 
p2p.ASYNC_CLOSE_WAIT_MILLISECONDS
-    // can be set to how many milliseconds to wait.
-    if (this.asyncCloseWaitTime == 0) {
-      getAsyncThreadExecutor(address).execute(r);
-    } else {
-      Future<?> future = getAsyncThreadExecutor(address).submit(r);
-      try {
-        future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
-      } catch (InterruptedException | ExecutionException | TimeoutException e) 
{
-        // We want this code to wait at most 50ms for the close to happen.
-        // It is ok to ignore these exception and let the close continue
-        // in the background.
-      }
+    if (!this.closed) {
+      this.closed = true;
+      socketCloserThreadPool.shutdown();
     }
   }
 
@@ -181,34 +129,40 @@ public class SocketCloser {
    * this method may block for a certain amount of time. If it is called after 
the SocketCloser is
    * closed then a normal synchronous close is done.
    * 
-   * @param sock the socket to close
-   * @param address identifies who the socket is connected to
-   * @param extra an optional Runnable with stuff to execute in the async 
thread
+   * @param socket the socket to close
+   * @param runnableCode an optional Runnable with stuff to execute in the 
async thread
    */
-  public void asyncClose(final Socket sock, final String address, final 
Runnable extra) {
-    if (sock == null || sock.isClosed()) {
+  public void asyncClose(final Socket socket, final Runnable runnableCode) {
+    if (socket == null || socket.isClosed()) {
       return;
     }
+
     boolean doItInline = false;
     try {
-      synchronized (asyncCloseExecutors) {
-        if (isClosed()) {
-          // this SocketCloser has been closed so do a synchronous, inline, 
close
-          doItInline = true;
-        } else {
-          asyncExecute(address, new Runnable() {
-            public void run() {
-              Thread.currentThread().setName("AsyncSocketCloser for " + 
address);
-              try {
-                if (extra != null) {
-                  extra.run();
-                }
-                inlineClose(sock);
-              } finally {
-                Thread.currentThread().setName("unused AsyncSocketCloser");
+      if (isClosed()) {
+        // this SocketCloser has been closed so do a synchronous, inline, close
+        doItInline = true;
+      } else {
+        socketCloserThreadPool.execute(() -> {
+          if (runnableCode != null) {
+            runnableCode.run();
+          }
+          inlineClose(socket);
+        });
+        if (this.asyncCloseWaitTime != 0) {
+          try {
+            Future future = socketCloserThreadPool.submit(() -> {
+              if (runnableCode != null) {
+                runnableCode.run();
               }
-            }
-          });
+              inlineClose(socket);
+            });
+            future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
+          } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+            // We want this code to wait at most 50ms for the close to happen.
+            // It is ok to ignore these exception and let the close continue
+            // in the background.
+          }
         }
       }
     } catch (OutOfMemoryError ignore) {
@@ -217,10 +171,10 @@ public class SocketCloser {
       doItInline = true;
     }
     if (doItInline) {
-      if (extra != null) {
-        extra.run();
+      if (runnableCode != null) {
+        runnableCode.run();
       }
-      inlineClose(sock);
+      inlineClose(socket);
     }
   }
 
@@ -228,19 +182,19 @@ public class SocketCloser {
   /**
    * Closes the specified socket
    * 
-   * @param sock the socket to close
+   * @param socket the socket to close
    */
-  private static void inlineClose(final Socket sock) {
+  private void inlineClose(final Socket socket) {
     // the next two statements are a mad attempt to fix bug
     // 36041 - segv in jrockit in pthread signaling code. This
     // seems to alleviate the problem.
     try {
-      sock.shutdownInput();
-      sock.shutdownOutput();
+      socket.shutdownInput();
+      socket.shutdownOutput();
     } catch (Exception e) {
     }
     try {
-      sock.close();
+      socket.close();
     } catch (IOException ignore) {
     } catch (VirtualMachineError err) {
       SystemFailure.initiateFailure(err);

http://git-wip-us.apache.org/repos/asf/geode/blob/56d7707e/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 0ecb3bf..954a33c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -562,7 +562,7 @@ public class Connection implements Runnable {
       } catch (IOException io) {
         logger.fatal(LocalizedMessage
             
.create(LocalizedStrings.Connection_UNABLE_TO_GET_P2P_CONNECTION_STREAMS), io);
-        t.getSocketCloser().asyncClose(socket, this.remoteAddr.toString(), 
null);
+        t.getSocketCloser().asyncClose(socket, null);
         throw io;
       }
     }
@@ -847,7 +847,7 @@ public class Connection implements Runnable {
         Socket s = this.socket;
         if (s != null && !s.isClosed()) {
           prepareForAsyncClose();
-          this.owner.getSocketCloser().asyncClose(s, 
String.valueOf(this.remoteAddr), null);
+          this.owner.getSocketCloser().asyncClose(s, null);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/56d7707e/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 044ab42..11c3bb3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -929,10 +929,6 @@ public class ConnectionTable {
               owner.getDM().getMembershipManager().getShutdownCause());
         }
       }
-
-      if (remoteAddress != null) {
-        this.socketCloser.releaseResourcesForAddress(remoteAddress.toString());
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/56d7707e/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
index 942cad4..b6dbfe2 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
@@ -14,22 +14,21 @@
  */
 package org.apache.geode.internal.net;
 
-import static org.junit.Assert.*;
-
-import java.net.Socket;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.internal.net.SocketCloser;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.junit.categories.UnitTest;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Tests the default SocketCloser.
@@ -62,86 +61,49 @@ public class SocketCloserJUnitTest {
    */
   @Test
   public void testAsync() {
-    final CountDownLatch cdl = new CountDownLatch(1);
+    final CountDownLatch countDownLatch = new CountDownLatch(1);
     final AtomicInteger waitingToClose = new AtomicInteger(0);
-    Runnable r = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          waitingToClose.incrementAndGet();
-          cdl.await();
-        } catch (InterruptedException e) {
-        }
-      }
-    };
 
     final int SOCKET_COUNT = 100;
-    final Socket[] aSockets = new Socket[SOCKET_COUNT];
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      aSockets[i] = createClosableSocket();
-    }
-    // Schedule a 100 sockets for async close.
-    // They should all be stuck on cdl.
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      this.socketCloser.asyncClose(aSockets[i], "A", r);
-    }
-    // Make sure the sockets have not been closed
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      assertEquals(false, aSockets[i].isClosed());
-    }
-    final Socket[] bSockets = new Socket[SOCKET_COUNT];
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      bSockets[i] = createClosableSocket();
-    }
+    final int REMOTE_CLIENT_COUNT = 200;
+
+    List<Socket> trackedSockets = new ArrayList<>();
     // Schedule a 100 sockets for async close.
-    // They should all be stuck on cdl.
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      this.socketCloser.asyncClose(bSockets[i], "B", r);
-    }
-    // Make sure the sockets have not been closed
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      assertEquals(false, bSockets[i].isClosed());
+    // They should all be stuck on countDownLatch.
+    for (int i = 0; i < REMOTE_CLIENT_COUNT; i++) {
+      Socket[] aSockets = new Socket[SOCKET_COUNT];
+
+      for (int j = 0; j < SOCKET_COUNT; j++) {
+        aSockets[j] = createClosableSocket();
+        trackedSockets.add(aSockets[j]);
+        this.socketCloser.asyncClose(aSockets[j], () -> {
+          try {
+            waitingToClose.incrementAndGet();
+            countDownLatch.await();
+          } catch (InterruptedException e) {
+          }
+        });
+      }
     }
+
     // close the socketCloser first to verify that the sockets
     // that have already been scheduled will be still be closed.
-    this.socketCloser.releaseResourcesForAddress("A");
     this.socketCloser.close();
-    // Each thread pool (one for A and one for B) has a max of 8 threads.
-    // So verify that this many are currently waiting on cdl.
-    {
-      final int maxThreads = this.socketCloser.getMaxThreads();
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          return waitingToClose.get() == 2 * maxThreads;
-        }
-
-        public String description() {
-          return "expected " + 2 * maxThreads + " waiters but found only " + 
waitingToClose.get();
-        }
-      };
-      Wait.waitForCriterion(wc, 5000, 10, true);
-    }
-    // now count down the latch that allows the sockets to close
-    cdl.countDown();
+    countDownLatch.countDown();
     // now all the sockets should get closed; use a wait criteria
     // since a thread pool is doing to closes
-    {
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          for (int i = 0; i < SOCKET_COUNT; i++) {
-            if (!aSockets[i].isClosed() || !bSockets[i].isClosed()) {
-              return false;
-            }
-          }
-          return true;
+    Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+      boolean areAllClosed = true;
+      for (Iterator<Socket> iterator = trackedSockets.iterator(); 
iterator.hasNext();) {
+        Socket socket = iterator.next();
+        if (socket.isClosed()) {
+          iterator.remove();
+          continue;
         }
-
-        public String description() {
-          return "one or more sockets did not close";
-        }
-      };
-      Wait.waitForCriterion(wc, 5000, 10, true);
-    }
+        areAllClosed = false;
+      }
+      return areAllClosed;
+    });
   }
 
   /**
@@ -150,18 +112,11 @@ public class SocketCloserJUnitTest {
   @Test
   public void testClosedSocket() throws Exception {
     final AtomicBoolean runnableCalled = new AtomicBoolean();
-    Runnable r = new Runnable() {
-      @Override
-      public void run() {
-        runnableCalled.set(true);
-      }
-    };
 
     Socket s = createClosableSocket();
     s.close();
-    this.socketCloser.asyncClose(s, "A", r);
-    Wait.pause(10);
-    assertEquals(false, runnableCalled.get());
+    this.socketCloser.asyncClose(s, () -> runnableCalled.set(true));
+    Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> 
!runnableCalled.get());
   }
 
   /**
@@ -170,25 +125,11 @@ public class SocketCloserJUnitTest {
   @Test
   public void testClosedSocketCloser() {
     final AtomicBoolean runnableCalled = new AtomicBoolean();
-    Runnable r = new Runnable() {
-      @Override
-      public void run() {
-        runnableCalled.set(true);
-      }
-    };
 
-    final Socket s = createClosableSocket();
+    final Socket closableSocket = createClosableSocket();
     this.socketCloser.close();
-    this.socketCloser.asyncClose(s, "A", r);
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        return runnableCalled.get() && s.isClosed();
-      }
-
-      public String description() {
-        return "runnable was not called or socket was not closed";
-      }
-    };
-    Wait.waitForCriterion(wc, 5000, 10, true);
+    this.socketCloser.asyncClose(closableSocket, () -> 
runnableCalled.set(true));
+    Awaitility.await().atMost(5, TimeUnit.SECONDS)
+        .until(() -> runnableCalled.get() && closableSocket.isClosed());
   }
 }

Reply via email to