GEODE-3416: Reduce synchronization blockages in SocketCloser.
Remove synchronization blocks around HashMap. Replace that implementation
with simpler ThreadPool that is not unbounded and does not grow as the
number of remoteAddress (clients/peers) are added


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/31e4db0f
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/31e4db0f
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/31e4db0f

Branch: refs/heads/feature/GEODE-3416
Commit: 31e4db0f7630c4350519c29f2ce4061fc0c2b1ee
Parents: 7352fcc
Author: Dave Barnes <dbar...@pivotal.io>
Authored: Thu Aug 10 17:11:50 2017 -0700
Committer: Udo Kohlmeyer <ukohlme...@pivotal.io>
Committed: Fri Aug 11 09:46:15 2017 -0700

----------------------------------------------------------------------
 .../master_middleman/bookbinder_helpers.rb      | 298 -------------------
 .../cache/tier/sockets/CacheClientProxy.java    |  51 +---
 .../apache/geode/internal/net/SocketCloser.java | 176 ++++-------
 .../apache/geode/internal/tcp/Connection.java   |   4 +-
 .../geode/internal/tcp/ConnectionTable.java     |   4 -
 .../internal/net/SocketCloserJUnitTest.java     | 155 +++-------
 6 files changed, 115 insertions(+), 573 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/31e4db0f/geode-book/master_middleman/bookbinder_helpers.rb
----------------------------------------------------------------------
diff --git a/geode-book/master_middleman/bookbinder_helpers.rb 
b/geode-book/master_middleman/bookbinder_helpers.rb
deleted file mode 100644
index 817875c..0000000
--- a/geode-book/master_middleman/bookbinder_helpers.rb
+++ /dev/null
@@ -1,298 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-require 'bookbinder/code_example_reader'
-require 'bookbinder/ingest/cloner_factory'
-require 'bookbinder/ingest/git_accessor'
-require 'bookbinder/local_filesystem_accessor'
-require 'date'
-require_relative 'archive_drop_down_menu'
-require_relative 'quicklinks_renderer'
-
-I18n.enforce_available_locales = false
-
-module Bookbinder
-  class Helpers < ::Middleman::Extension
-    # class << self
-    #   def registered(app)
-    #     app.helpers HelperMethods
-    #   end
-
-    #   alias :included :registered
-    # end
-
-    module HelperMethods
-
-      def yield_for_code_snippet(from: nil, at: nil)
-        cloner_factory = Ingest::ClonerFactory.new({out: $stdout},
-                                                   LocalFilesystemAccessor.new,
-                                                   Ingest::GitAccessor.new)
-
-        cloner = cloner_factory.produce(config[:local_repo_dir])
-        code_example_reader = CodeExampleReader.new({out: $stdout},
-                                                    
LocalFilesystemAccessor.new)
-        working_copy = cloner.call(source_repo_name: from,
-                                   source_ref: 'master',
-                                   destination_parent_dir: config[:workspace])
-
-        snippet, language = 
code_example_reader.get_snippet_and_language_at(at, working_copy)
-
-        delimiter = '```'
-
-        snippet.prepend("#{delimiter}#{language}\n").concat("\n#{delimiter}")
-      end
-
-      def elastic_search?
-        !!config[:elastic_search]
-      end
-
-      def yield_for_subnav
-        partial "subnavs/#{subnav_template_name}"
-      end
-
-      def yield_for_archive_drop_down_menu
-        menu = ArchiveDropDownMenu.new(
-          config[:archive_menu],
-          current_path: current_page.path
-        )
-        unless menu.empty?
-          partial 'archive_menus/default', locals: { menu_title: menu.title,
-                                                     dropdown_links: 
menu.dropdown_links }
-        end
-      end
-
-      def exclude_feedback
-        current_page.add_metadata({page: {feedback_disabled: true}})
-      end
-
-      def yield_for_feedback
-        partial 'layouts/feedback' if config[:feedback_enabled] && 
!current_page.metadata[:page][:feedback_disabled]
-      end
-
-      def exclude_repo_link
-        current_page.add_metadata({page: {repo_link_disabled: true}})
-      end
-
-      def render_repo_link
-        if config[:repo_link_enabled] && repo_url && 
!current_page.metadata[:page][:repo_link_disabled]
-          "<a id='repo-link' href='#{repo_url}'>View the source for this page 
in GitHub</a>"
-        end
-      end
-
-      def mermaid_diagram(&blk)
-        escaped_text = capture(&blk).gsub('-','\-')
-
-        @_out_buf.concat "<div class='mermaid'>#{escaped_text}</div>"
-      end
-
-      def modified_date(default_date: nil)
-        parsed_default_date = Time.parse(default_date).utc if default_date
-
-        date = page_last_modified_date || parsed_default_date
-
-        "Page last updated: <span data-behavior=\"DisplayModifiedDate\" 
data-modified-date=\"#{date.to_i}000\"></span>" if date
-      end
-
-      def breadcrumbs
-        page_chain = add_ancestors_of(current_page, [])
-        breadcrumbs = page_chain.map do |page|
-          make_breadcrumb(page, page == current_page)
-        end.compact
-        return if breadcrumbs.size < 2
-        return content_tag :ul, breadcrumbs.reverse.join(' '), class: 
'breadcrumbs'
-      end
-
-      def vars
-        OpenStruct.new config[:template_variables]
-      end
-
-      ## Geode helpers (start)
-      def geode_product_name
-        current_page.data.title= vars.geode_product_name
-      end
-
-      def geode_product_name_long
-        current_page.data.title= vars.geode_product_name_long
-      end
-
-      def geode_product_version
-        current_page.data.title= vars.geode_product_version
-      end
-
-      def set_title(*args)
-        current_page.data.title= args.join(' ')
-      end
-      ## Geode helpers (end)
-
-      def product_info
-        config[:product_info].fetch(template_key, {})
-      end
-
-      def production_host
-        config[:production_host]
-      end
-
-      def quick_links
-        page_src = File.read(current_page.source_file)
-        quicklinks_renderer = QuicklinksRenderer.new(vars)
-        Redcarpet::Markdown.new(quicklinks_renderer).render(page_src)
-      end
-
-      def owners
-        html_resources = sitemap.resources.select { |r| 
r.path.end_with?('.html') }
-        html_resources.each.with_object({}) { |resource, owners|
-          owners[resource.path] = Array(resource.data['owner'])
-        }
-      end
-
-      def template_key
-        decreasingly_specific_namespaces.detect { |ns|
-          config[:subnav_templates].has_key?(ns)
-        }
-      end
-
-      def body_classes(path=current_path.dup, options={})
-        if path.is_a? Hash
-          options = path
-          path = current_path.dup
-        end
-        basename = File.basename(path)
-        dirname = File.dirname(path).gsub('.', '_')
-        page_classes(File.join(dirname, basename), options)
-      end
-
-      private
-
-      def subnav_template_name
-        config[:subnav_templates][template_key] || 'default'
-      end
-
-      def decreasingly_specific_namespaces
-        body_classes(numeric_prefix: numeric_class_prefix).
-          split(' ').reverse.drop(1).
-          map {|ns| ns.sub(/^#{numeric_class_prefix}/, '')}
-      end
-
-      def numeric_class_prefix
-        'NUMERIC_CLASS_PREFIX'
-      end
-
-      def page_last_modified_date
-        git_accessor = Ingest::GitAccessor.new
-
-        current_date = if current_page.data.dita
-          
git_accessor.author_date(preprocessing_path(current_page.source_file), dita: 
true)
-        else
-          git_accessor.author_date(current_page.source_file)
-        end
-
-        current_date.utc if current_date
-      end
-
-      def repo_url
-        nested_dir, filename = parse_out_nested_dir_and_filename
-
-        repo_dir = match_repo_dir(nested_dir)
-        page_repo_config = config[:repo_links][repo_dir]
-
-        if page_repo_config && page_repo_config['ref']
-          org_repo = Pathname(page_repo_config['repo'])
-          ref = Pathname(page_repo_config['ref'])
-          at_path = at_path(page_repo_config)
-          nested_dir = extract_nested_directory(nested_dir, repo_dir)
-
-          "http://github.com/#{org_repo.join(Pathname('tree'), ref, 
Pathname(nested_dir), at_path, source_file(filename))}"
-        end
-      end
-
-      def match_repo_dir(nested_dir)
-        config[:repo_links].keys
-          .select{ |key| nested_dir.match(/^#{key}/) }
-          .sort_by{ |key| key.length }
-          .last
-      end
-
-      def source_file(filename)
-        fs = LocalFilesystemAccessor.new
-
-        if current_page.data.dita
-          source_filename = "#{filename}.xml"
-
-          if 
fs.source_file_exists?(Pathname(preprocessing_path(current_page.source_file)).dirname,
-                                             source_filename)
-            source_filename
-          else
-            ''
-          end
-        else
-          "#{filename}.html.md.erb"
-        end
-      end
-
-      def preprocessing_path(current_source_path)
-        root_path, nested_repo_path = current_source_path.split('source')
-
-        root_path.gsub!('/output/master_middleman', '')
-
-        "#{root_path}output/preprocessing/sections#{nested_repo_path}"
-      end
-
-      def parse_out_nested_dir_and_filename
-        current_page.path
-          .match(/\/?(.*?)\/?([^\/]*)\.html$?/)
-          .captures
-      end
-
-      def extract_nested_directory(nested_dir, repo_dir)
-        nested_dir = nested_dir.gsub("#{repo_dir}", '')
-        nested_dir = nested_dir.sub('/', '') if nested_dir[0] == '/'
-
-        nested_dir
-      end
-
-      def at_path(page_repo_config)
-        path = page_repo_config['at_path'] || ""
-
-        Pathname(path)
-      end
-
-      def add_ancestors_of(page, ancestors)
-        if page
-          add_ancestors_of(page.parent, ancestors + [page])
-        else
-          ancestors
-        end
-      end
-
-      def make_breadcrumb(page, is_current_page)
-        return nil unless (text = page.data.breadcrumb || page.data.title)
-        if is_current_page
-          css_class = 'active'
-          link = content_tag :span, text
-        else
-          link = link_to(text, '/' + page.path)
-        end
-        content_tag :li, link, :class => css_class
-      end
-    end
-    
-    helpers HelperMethods
-    
-
-  end
-end
-::Middleman::Extensions.register(:bookbinder, Bookbinder::Helpers)

http://git-wip-us.apache.org/repos/asf/geode/blob/31e4db0f/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index d7e3548..98bfed9 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -181,11 +181,7 @@ public class CacheClientProxy implements ClientSession {
    * True if we are connected to a client.
    */
   private volatile boolean connected = false;
-  // /**
-  // * A string representing interest in all keys
-  // */
-  // protected static final String ALL_KEYS = "ALL_KEYS";
-  //
+
   /**
    * True if a marker message is still in the ha queue.
    */
@@ -459,47 +455,6 @@ public class CacheClientProxy implements ClientSession {
     return this.proxyID;
   }
 
-  // the following code was commented out simply because it was not used
-  // /**
-  // * Determines if the proxy represents the client host (and only the host, 
not
-  // * necessarily the exact VM running on the host)
-  // *
-  // * @return Whether the proxy represents the client host
-  // */
-  // protected boolean representsClientHost(String clientHost)
-  // {
-  // // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings
-  // return this._remoteHostAddress.equals(clientHost);
-  // }
-
-  // protected boolean representsClientVM(DistributedMember remoteMember)
-  // {
-  // // logger.warn("Is input port " + clientPort + " contained in " +
-  // // logger.warn("Does input host " + clientHost + " equal " +
-  // // this._remoteHostAddress+ ": " + representsClientHost(clientHost));
-  // // logger.warn("representsClientVM: " +
-  // // (representsClientHost(clientHost) && containsPort(clientPort)));
-  // return (proxyID.getDistributedMember().equals(remoteMember));
-  // }
-
-  // /**
-  // * Determines if the CacheClientUpdater proxied by this instance is 
listening
-  // * on the input clientHost and clientPort
-  // *
-  // * @param clientHost
-  // * The host name of the client to compare
-  // * @param clientPort
-  // * The port number of the client to compare
-  // *
-  // * @return Whether the CacheClientUpdater proxied by this instance is
-  // * listening on the input clientHost and clientPort
-  // */
-  // protected boolean representsCacheClientUpdater(String clientHost,
-  // int clientPort)
-  // {
-  // return (clientPort == this._socket.getPort() && 
representsClientHost(clientHost));
-  // }
-
   protected boolean isMember(ClientProxyMembershipID memberId) {
     return this.proxyID.equals(memberId);
   }
@@ -994,8 +949,7 @@ public class CacheClientProxy implements ClientSession {
   private void closeSocket() {
     if (this._socketClosed.compareAndSet(false, true)) {
       // Close the socket
-      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, 
this._remoteHostAddress,
-          null);
+      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, 
null);
       getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
     }
   }
@@ -1009,7 +963,6 @@ public class CacheClientProxy implements ClientSession {
     {
       String remoteHostAddress = this._remoteHostAddress;
       if (remoteHostAddress != null) {
-        
this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
         this._remoteHostAddress = null;
       }
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/31e4db0f/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java 
b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
index 6d86fd8..fbbe797 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
@@ -14,11 +14,15 @@
  */
 package org.apache.geode.internal.net;
 
+import org.apache.geode.SystemFailure;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.LoggingThreadGroup;
+import org.apache.logging.log4j.Logger;
+
 import java.io.IOException;
 import java.net.Socket;
-import java.util.HashMap;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -26,12 +30,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.SystemFailure;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.LoggingThreadGroup;
-
 /**
  * This class allows sockets to be closed without blocking. In some cases we 
have seen a call of
  * socket.close block for minutes. This class maintains a thread pool for 
every other member we have
@@ -51,28 +49,27 @@ public class SocketCloser {
    * minutes).
    */
   static final long ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS =
-      Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120).longValue();
+      Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120);
   /**
    * Maximum number of threads that can be doing a socket close. Any close 
requests over this max
    * will queue up waiting for a thread.
    */
-  static final int ASYNC_CLOSE_POOL_MAX_THREADS =
-      Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8).intValue();
+  private static final int ASYNC_CLOSE_POOL_MAX_THREADS =
+      Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8);
   /**
    * How many milliseconds the synchronous requester waits for the async close 
to happen. Default is
    * 0. Prior releases waited 50ms.
    */
-  static final long ASYNC_CLOSE_WAIT_MILLISECONDS =
-      Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0).longValue();
+  private static final long ASYNC_CLOSE_WAIT_MILLISECONDS =
+      Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0);
 
 
-  /** map of thread pools of async close threads */
-  private final HashMap<String, ThreadPoolExecutor> asyncCloseExecutors = new 
HashMap<>();
   private final long asyncClosePoolKeepAliveSeconds;
   private final int asyncClosePoolMaxThreads;
   private final long asyncCloseWaitTime;
   private final TimeUnit asyncCloseWaitUnits;
   private boolean closed;
+  private final ExecutorService socketCloserThreadPool;
 
   public SocketCloser() {
     this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS,
@@ -90,53 +87,25 @@ public class SocketCloser {
     this.asyncClosePoolMaxThreads = asyncClosePoolMaxThreads;
     this.asyncCloseWaitTime = asyncCloseWaitTime;
     this.asyncCloseWaitUnits = asyncCloseWaitUnits;
+
+    final ThreadGroup threadGroup =
+        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
+    ThreadFactory threadFactory = command -> {
+      Thread thread = new Thread(threadGroup, command);
+      thread.setDaemon(true);
+      return thread;
+    };
+    socketCloserThreadPool = new 
ThreadPoolExecutor(this.asyncClosePoolMaxThreads,
+        this.asyncClosePoolMaxThreads, this.asyncClosePoolKeepAliveSeconds, 
TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(), threadFactory);
   }
 
   public int getMaxThreads() {
     return this.asyncClosePoolMaxThreads;
   }
 
-  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
-    synchronized (asyncCloseExecutors) {
-      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
-      if (pool == null) {
-        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket 
asyncClose", logger);
-        ThreadFactory tf = new ThreadFactory() {
-          public Thread newThread(final Runnable command) {
-            Thread thread = new Thread(tg, command);
-            thread.setDaemon(true);
-            return thread;
-          }
-        };
-        BlockingQueue<Runnable> workQueue = new 
LinkedBlockingQueue<Runnable>();
-        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, 
this.asyncClosePoolMaxThreads,
-            this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, 
tf);
-        pool.allowCoreThreadTimeOut(true);
-        asyncCloseExecutors.put(address, pool);
-      }
-      return pool;
-    }
-  }
-
-  /**
-   * Call this method if you know all the resources in the closer for the 
given address are no
-   * longer needed. Currently a thread pool is kept for each address and if 
you know that an address
-   * no longer needs its pool then you should call this method.
-   */
-  public void releaseResourcesForAddress(String address) {
-    synchronized (asyncCloseExecutors) {
-      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
-      if (pool != null) {
-        pool.shutdown();
-        asyncCloseExecutors.remove(address);
-      }
-    }
-  }
-
   private boolean isClosed() {
-    synchronized (asyncCloseExecutors) {
-      return this.closed;
-    }
+    return this.closed;
   }
 
   /**
@@ -144,34 +113,9 @@ public class SocketCloser {
    * called then the asyncClose will be done synchronously.
    */
   public void close() {
-    synchronized (asyncCloseExecutors) {
-      if (!this.closed) {
-        this.closed = true;
-        for (ThreadPoolExecutor pool : asyncCloseExecutors.values()) {
-          pool.shutdown();
-        }
-        asyncCloseExecutors.clear();
-      }
-    }
-  }
-
-  private void asyncExecute(String address, Runnable r) {
-    // Waiting 50ms for the async close request to complete is what the old 
(close per thread)
-    // code did. But now that we will not create a thread for every close 
request
-    // it seems better to let the thread that requested the close to move on 
quickly.
-    // So the default has changed to not wait. The system property 
p2p.ASYNC_CLOSE_WAIT_MILLISECONDS
-    // can be set to how many milliseconds to wait.
-    if (this.asyncCloseWaitTime == 0) {
-      getAsyncThreadExecutor(address).execute(r);
-    } else {
-      Future<?> future = getAsyncThreadExecutor(address).submit(r);
-      try {
-        future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
-      } catch (InterruptedException | ExecutionException | TimeoutException e) 
{
-        // We want this code to wait at most 50ms for the close to happen.
-        // It is ok to ignore these exception and let the close continue
-        // in the background.
-      }
+    if (!this.closed) {
+      this.closed = true;
+      socketCloserThreadPool.shutdown();
     }
   }
 
@@ -181,34 +125,40 @@ public class SocketCloser {
    * this method may block for a certain amount of time. If it is called after 
the SocketCloser is
    * closed then a normal synchronous close is done.
    * 
-   * @param sock the socket to close
-   * @param address identifies who the socket is connected to
-   * @param extra an optional Runnable with stuff to execute in the async 
thread
+   * @param socket the socket to close
+   * @param runnableCode an optional Runnable with stuff to execute in the 
async thread
    */
-  public void asyncClose(final Socket sock, final String address, final 
Runnable extra) {
-    if (sock == null || sock.isClosed()) {
+  public void asyncClose(final Socket socket, final Runnable runnableCode) {
+    if (socket == null || socket.isClosed()) {
       return;
     }
+
     boolean doItInline = false;
     try {
-      synchronized (asyncCloseExecutors) {
-        if (isClosed()) {
-          // this SocketCloser has been closed so do a synchronous, inline, 
close
-          doItInline = true;
-        } else {
-          asyncExecute(address, new Runnable() {
-            public void run() {
-              Thread.currentThread().setName("AsyncSocketCloser for " + 
address);
-              try {
-                if (extra != null) {
-                  extra.run();
-                }
-                inlineClose(sock);
-              } finally {
-                Thread.currentThread().setName("unused AsyncSocketCloser");
+      if (isClosed()) {
+        // this SocketCloser has been closed so do a synchronous, inline, close
+        doItInline = true;
+      } else {
+        socketCloserThreadPool.execute(() -> {
+          if (runnableCode != null) {
+            runnableCode.run();
+          }
+          inlineClose(socket);
+        });
+        if (this.asyncCloseWaitTime != 0) {
+          try {
+            Future future = socketCloserThreadPool.submit(() -> {
+              if (runnableCode != null) {
+                runnableCode.run();
               }
-            }
-          });
+              inlineClose(socket);
+            });
+            future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
+          } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+            // We want this code to wait at most 50ms for the close to happen.
+            // It is ok to ignore these exception and let the close continue
+            // in the background.
+          }
         }
       }
     } catch (OutOfMemoryError ignore) {
@@ -217,10 +167,10 @@ public class SocketCloser {
       doItInline = true;
     }
     if (doItInline) {
-      if (extra != null) {
-        extra.run();
+      if (runnableCode != null) {
+        runnableCode.run();
       }
-      inlineClose(sock);
+      inlineClose(socket);
     }
   }
 
@@ -228,19 +178,19 @@ public class SocketCloser {
   /**
    * Closes the specified socket
    * 
-   * @param sock the socket to close
+   * @param socket the socket to close
    */
-  private static void inlineClose(final Socket sock) {
+  private void inlineClose(final Socket socket) {
     // the next two statements are a mad attempt to fix bug
     // 36041 - segv in jrockit in pthread signaling code. This
     // seems to alleviate the problem.
     try {
-      sock.shutdownInput();
-      sock.shutdownOutput();
+      socket.shutdownInput();
+      socket.shutdownOutput();
     } catch (Exception e) {
     }
     try {
-      sock.close();
+      socket.close();
     } catch (IOException ignore) {
     } catch (VirtualMachineError err) {
       SystemFailure.initiateFailure(err);

http://git-wip-us.apache.org/repos/asf/geode/blob/31e4db0f/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 0ecb3bf..954a33c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -562,7 +562,7 @@ public class Connection implements Runnable {
       } catch (IOException io) {
         logger.fatal(LocalizedMessage
             
.create(LocalizedStrings.Connection_UNABLE_TO_GET_P2P_CONNECTION_STREAMS), io);
-        t.getSocketCloser().asyncClose(socket, this.remoteAddr.toString(), 
null);
+        t.getSocketCloser().asyncClose(socket, null);
         throw io;
       }
     }
@@ -847,7 +847,7 @@ public class Connection implements Runnable {
         Socket s = this.socket;
         if (s != null && !s.isClosed()) {
           prepareForAsyncClose();
-          this.owner.getSocketCloser().asyncClose(s, 
String.valueOf(this.remoteAddr), null);
+          this.owner.getSocketCloser().asyncClose(s, null);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/31e4db0f/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 044ab42..11c3bb3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -929,10 +929,6 @@ public class ConnectionTable {
               owner.getDM().getMembershipManager().getShutdownCause());
         }
       }
-
-      if (remoteAddress != null) {
-        this.socketCloser.releaseResourcesForAddress(remoteAddress.toString());
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/31e4db0f/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
index 942cad4..b6dbfe2 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
@@ -14,22 +14,21 @@
  */
 package org.apache.geode.internal.net;
 
-import static org.junit.Assert.*;
-
-import java.net.Socket;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.internal.net.SocketCloser;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.junit.categories.UnitTest;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Tests the default SocketCloser.
@@ -62,86 +61,49 @@ public class SocketCloserJUnitTest {
    */
   @Test
   public void testAsync() {
-    final CountDownLatch cdl = new CountDownLatch(1);
+    final CountDownLatch countDownLatch = new CountDownLatch(1);
     final AtomicInteger waitingToClose = new AtomicInteger(0);
-    Runnable r = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          waitingToClose.incrementAndGet();
-          cdl.await();
-        } catch (InterruptedException e) {
-        }
-      }
-    };
 
     final int SOCKET_COUNT = 100;
-    final Socket[] aSockets = new Socket[SOCKET_COUNT];
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      aSockets[i] = createClosableSocket();
-    }
-    // Schedule a 100 sockets for async close.
-    // They should all be stuck on cdl.
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      this.socketCloser.asyncClose(aSockets[i], "A", r);
-    }
-    // Make sure the sockets have not been closed
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      assertEquals(false, aSockets[i].isClosed());
-    }
-    final Socket[] bSockets = new Socket[SOCKET_COUNT];
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      bSockets[i] = createClosableSocket();
-    }
+    final int REMOTE_CLIENT_COUNT = 200;
+
+    List<Socket> trackedSockets = new ArrayList<>();
     // Schedule a 100 sockets for async close.
-    // They should all be stuck on cdl.
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      this.socketCloser.asyncClose(bSockets[i], "B", r);
-    }
-    // Make sure the sockets have not been closed
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      assertEquals(false, bSockets[i].isClosed());
+    // They should all be stuck on countDownLatch.
+    for (int i = 0; i < REMOTE_CLIENT_COUNT; i++) {
+      Socket[] aSockets = new Socket[SOCKET_COUNT];
+
+      for (int j = 0; j < SOCKET_COUNT; j++) {
+        aSockets[j] = createClosableSocket();
+        trackedSockets.add(aSockets[j]);
+        this.socketCloser.asyncClose(aSockets[j], () -> {
+          try {
+            waitingToClose.incrementAndGet();
+            countDownLatch.await();
+          } catch (InterruptedException e) {
+          }
+        });
+      }
     }
+
     // close the socketCloser first to verify that the sockets
     // that have already been scheduled will be still be closed.
-    this.socketCloser.releaseResourcesForAddress("A");
     this.socketCloser.close();
-    // Each thread pool (one for A and one for B) has a max of 8 threads.
-    // So verify that this many are currently waiting on cdl.
-    {
-      final int maxThreads = this.socketCloser.getMaxThreads();
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          return waitingToClose.get() == 2 * maxThreads;
-        }
-
-        public String description() {
-          return "expected " + 2 * maxThreads + " waiters but found only " + 
waitingToClose.get();
-        }
-      };
-      Wait.waitForCriterion(wc, 5000, 10, true);
-    }
-    // now count down the latch that allows the sockets to close
-    cdl.countDown();
+    countDownLatch.countDown();
     // now all the sockets should get closed; use a wait criteria
     // since a thread pool is doing to closes
-    {
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          for (int i = 0; i < SOCKET_COUNT; i++) {
-            if (!aSockets[i].isClosed() || !bSockets[i].isClosed()) {
-              return false;
-            }
-          }
-          return true;
+    Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+      boolean areAllClosed = true;
+      for (Iterator<Socket> iterator = trackedSockets.iterator(); 
iterator.hasNext();) {
+        Socket socket = iterator.next();
+        if (socket.isClosed()) {
+          iterator.remove();
+          continue;
         }
-
-        public String description() {
-          return "one or more sockets did not close";
-        }
-      };
-      Wait.waitForCriterion(wc, 5000, 10, true);
-    }
+        areAllClosed = false;
+      }
+      return areAllClosed;
+    });
   }
 
   /**
@@ -150,18 +112,11 @@ public class SocketCloserJUnitTest {
   @Test
   public void testClosedSocket() throws Exception {
     final AtomicBoolean runnableCalled = new AtomicBoolean();
-    Runnable r = new Runnable() {
-      @Override
-      public void run() {
-        runnableCalled.set(true);
-      }
-    };
 
     Socket s = createClosableSocket();
     s.close();
-    this.socketCloser.asyncClose(s, "A", r);
-    Wait.pause(10);
-    assertEquals(false, runnableCalled.get());
+    this.socketCloser.asyncClose(s, () -> runnableCalled.set(true));
+    Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> 
!runnableCalled.get());
   }
 
   /**
@@ -170,25 +125,11 @@ public class SocketCloserJUnitTest {
   @Test
   public void testClosedSocketCloser() {
     final AtomicBoolean runnableCalled = new AtomicBoolean();
-    Runnable r = new Runnable() {
-      @Override
-      public void run() {
-        runnableCalled.set(true);
-      }
-    };
 
-    final Socket s = createClosableSocket();
+    final Socket closableSocket = createClosableSocket();
     this.socketCloser.close();
-    this.socketCloser.asyncClose(s, "A", r);
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        return runnableCalled.get() && s.isClosed();
-      }
-
-      public String description() {
-        return "runnable was not called or socket was not closed";
-      }
-    };
-    Wait.waitForCriterion(wc, 5000, 10, true);
+    this.socketCloser.asyncClose(closableSocket, () -> 
runnableCalled.set(true));
+    Awaitility.await().atMost(5, TimeUnit.SECONDS)
+        .until(() -> runnableCalled.get() && closableSocket.isClosed());
   }
 }

Reply via email to