This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 3ef1905 GEODE-8817: server hangs in cache.close() (#5987)
3ef1905 is described below
commit 3ef1905e8b88d99ac3071f94d830ae23010e3d2c
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Tue Feb 9 07:56:45 2021 -0800
GEODE-8817: server hangs in cache.close() (#5987)
* possible fix
* Revert "possible fix"
This reverts commit f8451da33d3568c1900bda2b7a4bb3f4fef660ba.
Testing showed that SSLSocket.close() still hangs when there is a
network failure and another thread is reading from the socket.
* revert socket timeout and introduce a SocketCloser executor
* SocketCloser cleanup
* change cleanup to be performed after the socket is closed
* bill's comments addressed
* bill's new comments
* removed useless method annotation
---
.../cache/tier/sockets/CacheClientProxyTest.java | 2 +-
.../internal/net/SocketCloserIntegrationTest.java | 28 +++--
.../internal/LonerDistributionManager.java | 137 ---------------------
.../apache/geode/internal/cache/tier/Acceptor.java | 3 +
.../internal/cache/tier/sockets/AcceptorImpl.java | 9 ++
.../cache/tier/sockets/CacheClientProxy.java | 4 +-
.../cache/tier/sockets/ServerConnection.java | 20 ++-
.../apache/geode/internal/net/SocketCloser.java | 72 ++++++-----
8 files changed, 92 insertions(+), 183 deletions(-)
diff --git
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
index 0158946..079c54e 100644
---
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
+++
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
@@ -65,7 +65,7 @@ public class CacheClientProxyTest {
final InetAddress address = mock(InetAddress.class);
when(socket.getInetAddress()).thenReturn(address);
when(address.getHostAddress()).thenReturn("localhost");
- doNothing().when(sc).asyncClose(any(), eq("localhost"), eq(null));
+ doNothing().when(sc).asyncClose(any(), eq("localhost"),
any(Runnable.class));
final ClientProxyMembershipID proxyID =
mock(ClientProxyMembershipID.class);
final DistributedMember member =
cache.getDistributedSystem().getDistributedMember();
diff --git
a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SocketCloserIntegrationTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SocketCloserIntegrationTest.java
index b2db1b8..ee7ac5c 100644
---
a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SocketCloserIntegrationTest.java
+++
b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SocketCloserIntegrationTest.java
@@ -113,13 +113,17 @@ public class SocketCloserIntegrationTest {
* Verify that requesting an asyncClose on an already closed socket is a
noop.
*/
@Test
- public void testClosedSocket() throws Exception {
- final AtomicBoolean runnableCalled = new AtomicBoolean();
+ public void testOpenSocketCloser() {
+ final AtomicBoolean beforeSocketCloseRunnableWasCalled = new
AtomicBoolean();
+ final AtomicBoolean afterSocketCloseRunnableWasCalled = new
AtomicBoolean();
- Socket s = createClosableSocket();
- s.close();
- this.socketCloser.asyncClose(s, "A", () -> runnableCalled.set(true));
- await().until(() -> !runnableCalled.get());
+ final Socket closableSocket = createClosableSocket();
+ this.socketCloser.asyncClose(closableSocket, "A",
+ () -> beforeSocketCloseRunnableWasCalled.set(true),
+ () -> afterSocketCloseRunnableWasCalled.set(true));
+ await().until(() -> beforeSocketCloseRunnableWasCalled.get());
+ await().until(() -> closableSocket.isClosed());
+ await().until(() -> afterSocketCloseRunnableWasCalled.get());
}
/**
@@ -127,12 +131,16 @@ public class SocketCloserIntegrationTest {
*/
@Test
public void testClosedSocketCloser() {
- final AtomicBoolean runnableCalled = new AtomicBoolean();
+ final AtomicBoolean beforeSocketCloseRunnableWasCalled = new
AtomicBoolean();
+ final AtomicBoolean afterSocketCloseRunnableWasCalled = new
AtomicBoolean();
final Socket closableSocket = createClosableSocket();
this.socketCloser.close();
- this.socketCloser.asyncClose(closableSocket, "A", () ->
runnableCalled.set(true));
- await()
- .until(() -> runnableCalled.get() && closableSocket.isClosed());
+ this.socketCloser.asyncClose(closableSocket, "A",
+ () -> beforeSocketCloseRunnableWasCalled.set(true),
+ () -> afterSocketCloseRunnableWasCalled.set(true));
+ await().until(() -> beforeSocketCloseRunnableWasCalled.get());
+ await().until(() -> closableSocket.isClosed());
+ await().until(() -> afterSocketCloseRunnableWasCalled.get());
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index 532ca01..08dab41 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -23,14 +23,10 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.geode.CancelCriterion;
import org.apache.geode.InternalGemFireError;
@@ -1023,139 +1019,6 @@ public class LonerDistributionManager implements
DistributionManager {
return 0;
}
}
- protected static class DummyExecutor implements ExecutorService {
- @Override
- public void execute(Runnable command) {
- command.run();
- }
-
- @Override
- public void shutdown() {}
-
- @Override
- public List<Runnable> shutdownNow() {
- return Collections.emptyList();
- }
-
- @Override
- public boolean isShutdown() {
- return false;
- }
-
- @Override
- public boolean isTerminated() {
- return false;
- }
-
- @Override
- public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
- return true;
- }
-
- @Override
- public <T> Future<T> submit(final Callable<T> task) {
- Exception ex = null;
- T result = null;
- try {
- result = task.call();
- } catch (Exception e) {
- ex = e;
- }
- return new CompletedFuture<T>(result, ex);
- }
-
- @Override
- public <T> Future<T> submit(final Runnable task, final T result) {
- return submit(new Callable<T>() {
- @Override
- public T call() throws Exception {
- task.run();
- return result;
- }
- });
- }
-
- @Override
- public Future<?> submit(Runnable task) {
- return submit(task, null);
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>
tasks)
- throws InterruptedException {
- List<Future<T>> results = new ArrayList<Future<T>>();
- for (Callable<T> task : tasks) {
- results.add(submit(task));
- }
- return results;
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>
tasks, long timeout,
- TimeUnit unit) throws InterruptedException {
- return invokeAll(tasks);
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException {
-
- ExecutionException ex = null;
- for (Callable<T> task : tasks) {
- try {
- return submit(task).get();
- } catch (ExecutionException e) {
- ex = e;
- }
- }
- throw (ExecutionException) ex.fillInStackTrace();
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long
timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- return invokeAny(tasks);
- }
- }
-
- private static class CompletedFuture<T> implements Future<T> {
- private final T result;
- private final Exception ex;
-
- public CompletedFuture(T result, Exception ex) {
- this.result = result;
- this.ex = ex;
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return false;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return true;
- }
-
- @Override
- public T get() throws InterruptedException, ExecutionException {
- if (ex != null) {
- throw new ExecutionException(ex);
- }
- return result;
- }
-
- @Override
- public T get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- return get();
- }
- }
@Override
public void throwIfDistributionStopped() {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
index 2982549..b5c0a34 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
@@ -25,6 +25,7 @@ import
org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor;
import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
import org.apache.geode.internal.cache.tier.sockets.ConnectionListener;
import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.internal.net.SocketCloser;
/**
* Defines the message listener/acceptor interface which is the GemFire cache
server. Multiple
@@ -107,4 +108,6 @@ public interface Acceptor extends CommBufferPool {
void decClientServerConnectionCount();
int getMaximumTimeBetweenPings();
+
+ SocketCloser getSocketCloser();
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 1dd4cb7..e7d71dc 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -93,6 +93,7 @@ import
org.apache.geode.internal.cache.wan.GatewayReceiverStats;
import org.apache.geode.internal.inet.LocalHostUtil;
import org.apache.geode.internal.logging.CoreLoggingExecutors;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.net.SocketCloser;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.serialization.KnownVersion;
@@ -349,6 +350,8 @@ public class AcceptorImpl implements Acceptor, Runnable {
private final ServerConnectionFactory serverConnectionFactory;
+ private final SocketCloser socketCloser = new SocketCloser();
+
/**
* Constructs an AcceptorImpl for use within a CacheServer.
*
@@ -1707,6 +1710,7 @@ public class AcceptorImpl implements Acceptor, Runnable {
for (ServerConnection serverConnection : snap) {
serverConnection.cleanup();
}
+ socketCloser.close();
}
}
@@ -1872,6 +1876,11 @@ public class AcceptorImpl implements Acceptor, Runnable {
return maximumTimeBetweenPings;
}
+ @Override
+ public SocketCloser getSocketCloser() {
+ return socketCloser;
+ }
+
private static class ClientQueueInitializerTask implements Runnable {
private final Socket socket;
private final boolean isPrimaryServerToClient;
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index c17b302..ec006ca 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -954,7 +954,9 @@ public class CacheClientProxy implements ClientSession {
String remoteHostAddress = this._remoteHostAddress;
if (this._socketClosed.compareAndSet(false, true) && remoteHostAddress !=
null) {
// Only one thread is expected to close the socket
- this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket,
remoteHostAddress, null);
+ this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket,
remoteHostAddress,
+ () -> {
+ });
getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
return true;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 29eec6a..625da9e 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -1412,11 +1412,22 @@ public abstract class ServerConnection implements
Runnable {
getAcceptor().decClientServerConnectionCount();
}
- try {
- theSocket.close();
- } catch (Exception ignored) {
- }
+ if (!theSocket.isClosed()) {
+ // Here we direct closing of sockets to one of two executors. Use of an
executor
+ // keeps us from causing an explosion of new threads when a server is
shut down.
+ // Background threads are used in case the close() operation on the
socket hangs.
+ final String closerName =
+ communicationMode.isWAN() ? "WANSocketCloser" :
"CacheServerSocketCloser";
+ acceptor.getSocketCloser().asyncClose(theSocket, closerName, () -> {
+ },
+ () -> cleanupAfterSocketClose());
+ return true;
+ }
+ cleanupAfterSocketClose();
+ return true;
+ }
+ protected void cleanupAfterSocketClose() {
try {
if (postAuthzRequest != null) {
postAuthzRequest.close();
@@ -1437,7 +1448,6 @@ public abstract class ServerConnection implements
Runnable {
}
releaseCommBuffer();
processMessages = false;
- return true;
}
private void releaseCommBuffer() {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
index 15a129e..3613550 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
@@ -16,6 +16,7 @@ package org.apache.geode.internal.net;
import java.io.IOException;
import java.net.Socket;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -25,6 +26,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
+import org.jetbrains.annotations.NotNull;
+
import org.apache.geode.SystemFailure;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
@@ -66,7 +69,6 @@ public class SocketCloser {
*/
private final ConcurrentHashMap<String, ExecutorService> asyncCloseExecutors
=
new ConcurrentHashMap<>();
- private final long asyncClosePoolKeepAliveSeconds;
private final int asyncClosePoolMaxThreads;
private final long asyncCloseWaitTime;
private final TimeUnit asyncCloseWaitUnits;
@@ -91,7 +93,6 @@ public class SocketCloser {
public SocketCloser(long asyncClosePoolKeepAliveSeconds, int
asyncClosePoolMaxThreads,
long asyncCloseWaitTime, TimeUnit asyncCloseWaitUnits) {
- this.asyncClosePoolKeepAliveSeconds = asyncClosePoolKeepAliveSeconds;
this.asyncClosePoolMaxThreads = asyncClosePoolMaxThreads;
this.asyncCloseWaitTime = asyncCloseWaitTime;
this.asyncCloseWaitUnits = asyncCloseWaitUnits;
@@ -163,60 +164,73 @@ public class SocketCloser {
}
/**
- * Closes the specified socket in a background thread. In some cases we see
close hang (see bug
- * 33665). Depending on how the SocketCloser is configured (see
ASYNC_CLOSE_WAIT_MILLISECONDS)
+ * Closes the specified socket in a background thread. In some cases we see
close hang.
+ * Depending on how the SocketCloser is configured (see
ASYNC_CLOSE_WAIT_MILLISECONDS)
+ * this method may block for a certain amount of time. If it is called after
the SocketCloser is
+ * closed then a normal synchronous close is done.
+ *
+ * @param socket the socket to close
+ * @param address key used to determine which executor to use. Usually the
address of a peer or
+ * client
+ * @param runBeforeClose a Runnable with stuff to execute before the socket
is closed
+ */
+ public void asyncClose(final Socket socket, final String address, final
Runnable runBeforeClose) {
+ asyncClose(socket, address, runBeforeClose, () -> {
+ });
+ }
+
+ /**
+ * Closes the specified socket in a background thread. In some cases we see
close hang.
+ * Depending on how the SocketCloser is configured (see
ASYNC_CLOSE_WAIT_MILLISECONDS)
* this method may block for a certain amount of time. If it is called after
the SocketCloser is
* closed then a normal synchronous close is done.
*
* @param socket the socket to close
- * @param address identifies who the socket is connected to
- * @param extra an optional Runnable with stuff to execute before the socket
is closed
+ * @param address key used to determine which executor to use. Usually the
address of a peer or
+ * client
+ * @param runBeforeClose a Runnable with stuff to execute before the socket
is closed
+ * @param runAfterClose a Runnable with stuff to execute after the socket is
closed
*/
- public void asyncClose(final Socket socket, final String address, final
Runnable extra) {
- if (socket == null || socket.isClosed()) {
+ public void asyncClose(@NotNull final Socket socket, @NotNull final String
address,
+ @NotNull final Runnable runBeforeClose,
+ @NotNull final Runnable runAfterClose) {
+ if (socket.isClosed()) {
return;
}
boolean doItInline = false;
try {
- Future submittedTask = null;
+ Future submittedTask = CompletableFuture.completedFuture(this);
closedLock.lock();
try {
if (closed) {
// this SocketCloser has been closed so do a synchronous, inline,
close
doItInline = true;
} else {
- submittedTask = asyncExecute(address, new Runnable() {
- @Override
- public void run() {
- Thread.currentThread().setName("AsyncSocketCloser for " +
address);
- try {
- if (extra != null) {
- extra.run();
- }
- inlineClose(socket);
- } finally {
- Thread.currentThread().setName("unused AsyncSocketCloser");
- }
+ submittedTask = asyncExecute(address, () -> {
+ Thread.currentThread().setName("AsyncSocketCloser for " + address);
+ try {
+ runBeforeClose.run();
+ inlineClose(socket);
+ runAfterClose.run();
+ } finally {
+ Thread.currentThread().setName("unused AsyncSocketCloser");
}
});
}
} finally {
closedLock.unlock();
}
- if (submittedTask != null) {
+ if (!doItInline) {
waitForFutureTaskWithTimeout(submittedTask);
+ return;
}
} catch (RejectedExecutionException | OutOfMemoryError ignore) {
// If we can't start a thread to close the socket just do it inline.
// See bug 50573.
- doItInline = true;
- }
- if (doItInline) {
- if (extra != null) {
- extra.run();
- }
- inlineClose(socket);
}
+ runBeforeClose.run();
+ inlineClose(socket);
+ runAfterClose.run();
}
private void waitForFutureTaskWithTimeout(Future submittedTask) {