This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.14 by this push:
new 14e92a0 GEODE-9819: fix durable client socket leak (#7266)
14e92a0 is described below
commit 14e92a03ad51b27e441385771d3ebdef73399d76
Author: Darrel Schneider <[email protected]>
AuthorDate: Thu Jan 13 17:02:49 2022 -0800
GEODE-9819: fix durable client socket leak (#7266)
Added unit test that reproduced the socket leak.
This involved some change to the product classes
to make them unit testable.
Fixed the leak by making sure socket.close is called
if the response code was not successful.
(cherry picked from commit 97601eb2cd585f844b7f02bb73ba42fcb86a7cb4)
---
.../cache/tier/sockets/CacheClientNotifier.java | 54 +++++++++++++++-------
.../cache/tier/sockets/MessageDispatcher.java | 5 +-
.../tier/sockets/CacheClientNotifierTest.java | 32 +++++++++++++
3 files changed, 72 insertions(+), 19 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index da3c345..b9b5048 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -125,18 +125,12 @@ public class CacheClientNotifier {
@MakeNotStatic
private static volatile CacheClientNotifier ccnSingleton;
- private final SocketMessageWriter socketMessageWriter = new
SocketMessageWriter();
+ private final SocketMessageWriter socketMessageWriter;
private final ClientRegistrationEventQueueManager
clientRegistrationEventQueueManager;
private final CacheClientProxyFactory cacheClientProxyFactory;
- /**
- * Factory method to construct a CacheClientNotifier {@code
CacheClientNotifier} instance.
- *
- * @param cache The GemFire {@code InternalCache}
- * @param clientRegistrationEventQueueManager Manages temporary registration
queues for clients
- * @return A {@code CacheClientNotifier} instance
- */
- public static synchronized CacheClientNotifier getInstance(InternalCache
cache,
+ @VisibleForTesting
+ static CacheClientNotifier getInstance(InternalCache cache,
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
StatisticsClock statisticsClock,
CacheServerStats acceptorStats,
@@ -144,11 +138,12 @@ public class CacheClientNotifier {
int messageTimeToLive,
ConnectionListener listener,
OverflowAttributes overflowAttributes,
- boolean isGatewayReceiver) {
+ boolean isGatewayReceiver,
+ SocketMessageWriter socketMessageWriter) {
if (ccnSingleton == null) {
ccnSingleton = new CacheClientNotifier(cache,
clientRegistrationEventQueueManager,
statisticsClock, acceptorStats, maximumMessageCount,
messageTimeToLive, listener,
- isGatewayReceiver, new CacheClientProxyFactory());
+ isGatewayReceiver, new CacheClientProxyFactory(),
socketMessageWriter);
}
if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
@@ -159,6 +154,27 @@ public class CacheClientNotifier {
return ccnSingleton;
}
+ /**
+ * Factory method to construct a CacheClientNotifier {@code
CacheClientNotifier} instance.
+ *
+ * @param cache The GemFire {@code InternalCache}
+ * @param clientRegistrationEventQueueManager Manages temporary registration
queues for clients
+ * @return A {@code CacheClientNotifier} instance
+ */
+ public static synchronized CacheClientNotifier getInstance(InternalCache
cache,
+ ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
+ StatisticsClock statisticsClock,
+ CacheServerStats acceptorStats,
+ int maximumMessageCount,
+ int messageTimeToLive,
+ ConnectionListener listener,
+ OverflowAttributes overflowAttributes,
+ boolean isGatewayReceiver) {
+ return getInstance(cache, clientRegistrationEventQueueManager,
statisticsClock,
+ acceptorStats, maximumMessageCount, messageTimeToLive, listener,
overflowAttributes,
+ isGatewayReceiver, new SocketMessageWriter());
+ }
+
public static CacheClientNotifier getInstance() {
return ccnSingleton;
}
@@ -435,15 +451,19 @@ public class CacheClientNotifier {
if (logger.isDebugEnabled()) {
logger.debug("CacheClientNotifier: Successfully registered {}",
cacheClientProxy);
}
+ performPostAuthorization(cacheClientProxy, clientProxyMembershipID,
member,
+ sysProps,
+ subjectOrPrincipal);
} else {
+ try {
+ // prevent leak by closing socket
+ socket.close();
+ } catch (IOException ignore) {
+ }
logger.warn(
"CacheClientNotifier: Unsuccessfully registered client with
identifier {} and response code {}",
new Object[] {clientProxyMembershipID, responseByte});
}
-
- performPostAuthorization(cacheClientProxy, clientProxyMembershipID, member,
- sysProps,
- subjectOrPrincipal);
}
private void handleAuthenticationException(final ClientProxyMembershipID
clientProxyMembershipID,
@@ -1715,7 +1735,9 @@ public class CacheClientNotifier {
int messageTimeToLive,
ConnectionListener listener,
boolean isGatewayReceiver,
- CacheClientProxyFactory cacheClientProxyFactory) {
+ CacheClientProxyFactory cacheClientProxyFactory,
+ SocketMessageWriter socketMessageWriter) {
+ this.socketMessageWriter = socketMessageWriter;
this.cacheClientProxyFactory = cacheClientProxyFactory;
// Set the Cache
setCache(cache);
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
index c062530..b0cab4a 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
@@ -36,7 +36,6 @@ import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.ClientServerObserver;
import org.apache.geode.internal.cache.ClientServerObserverHolder;
import org.apache.geode.internal.cache.Conflatable;
@@ -140,8 +139,8 @@ public class MessageDispatcher extends LoggingThread {
.putProxy(HARegionQueue.createRegionName(getProxy().getHARegionName()),
getProxy());
boolean createDurableQueue = proxy.proxyID.isDurable();
boolean canHandleDelta =
-
InternalDistributedSystem.getAnyInstance().getConfig().getDeltaPropagation()
- && !(_proxy.clientConflation == Handshake.CONFLATION_ON);
+
proxy.getCache().getInternalDistributedSystem().getConfig().getDeltaPropagation()
+ && !(proxy.clientConflation == Handshake.CONFLATION_ON);
if ((createDurableQueue || canHandleDelta) && logger.isDebugEnabled()) {
logger.debug("Creating a {} subscription queue for {}",
createDurableQueue ? "durable" : "non-durable",
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
index 502a36d..7651c68 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
@@ -32,6 +32,7 @@ import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
@@ -49,6 +50,7 @@ import org.apache.geode.CancelCriterion;
import org.apache.geode.Statistics;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.query.internal.cq.ServerCQ;
+import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.cache.EntryEventImpl;
@@ -361,4 +363,34 @@ public class CacheClientNotifierTest {
return internalCacheEvent;
}
+ @Test
+ public void registerClientInternalWithDuplicateDurableClientClosesSocket()
throws Exception {
+ when(internalCache.getCCPTimer())
+ .thenReturn(mock(SystemTimer.class));
+ when(internalCache.getInternalDistributedSystem())
+ .thenReturn(internalDistributedSystem);
+
when(internalCache.getDistributedSystem()).thenReturn(internalDistributedSystem);
+
when(internalDistributedSystem.getProperties()).thenReturn(mock(Properties.class));
+ when(internalDistributedSystem.getStatisticsManager())
+ .thenReturn(statisticsManager);
+ when(statisticsManager.createAtomicStatistics(any(), any()))
+ .thenReturn(statistics);
+ cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
+ mock(ClientRegistrationEventQueueManager.class),
mock(StatisticsClock.class),
+ mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class),
null, false,
+ mock(SocketMessageWriter.class));
+ ClientRegistrationMetadata metadata =
mock(ClientRegistrationMetadata.class);
+ ClientProxyMembershipID id = mock(ClientProxyMembershipID.class);
+ CacheClientProxy proxy = mock(CacheClientProxy.class);
+ when(proxy.getProxyID()).thenReturn(id);
+ when(metadata.getClientProxyMembershipID()).thenReturn(id);
+ when(id.getDistributedMember()).thenReturn(mock(DistributedMember.class));
+ when(id.getDurableId()).thenReturn("durable");
+ when(id.isDurable()).thenReturn(true);
+ cacheClientNotifier.addClientProxy(proxy);
+
+ cacheClientNotifier.registerClientInternal(metadata, socket, true, 0,
false);
+
+ verify(socket).close();
+ }
}