This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.13 by this push:
new ad9b0f6 GEODE-9819: fix durable client socket leak (#7270)
ad9b0f6 is described below
commit ad9b0f6d68edd802061f2a56595370f4a46f15a5
Author: Darrel Schneider <[email protected]>
AuthorDate: Fri Jan 14 13:02:09 2022 -0800
GEODE-9819: fix durable client socket leak (#7270)
Added unit test that reproduced the socket leak.
This involved some change to the product classes
to make them unit testable.
Fixed the leak by making sure socket.close is called
if the response code was not successful.
(cherry picked from commit 97601eb2cd585f844b7f02bb73ba42fcb86a7cb4)
---
.../cache/tier/sockets/CacheClientNotifier.java | 54 +++++++++++++++-------
.../cache/tier/sockets/MessageDispatcher.java | 5 +-
.../tier/sockets/CacheClientNotifierTest.java | 42 +++++++++++++++++
3 files changed, 82 insertions(+), 19 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index 44ec0ba..911b2e0 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -124,18 +124,12 @@ public class CacheClientNotifier {
@MakeNotStatic
private static volatile CacheClientNotifier ccnSingleton;
- private final SocketMessageWriter socketMessageWriter = new
SocketMessageWriter();
+ private final SocketMessageWriter socketMessageWriter;
private final ClientRegistrationEventQueueManager
clientRegistrationEventQueueManager;
private final CacheClientProxyFactory cacheClientProxyFactory;
- /**
- * Factory method to construct a CacheClientNotifier {@code
CacheClientNotifier} instance.
- *
- * @param cache The GemFire {@code InternalCache}
- * @param clientRegistrationEventQueueManager Manages temporary registration
queues for clients
- * @return A {@code CacheClientNotifier} instance
- */
- public static synchronized CacheClientNotifier getInstance(InternalCache
cache,
+ @VisibleForTesting
+ static CacheClientNotifier getInstance(InternalCache cache,
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
StatisticsClock statisticsClock,
CacheServerStats acceptorStats,
@@ -143,11 +137,12 @@ public class CacheClientNotifier {
int messageTimeToLive,
ConnectionListener listener,
OverflowAttributes overflowAttributes,
- boolean isGatewayReceiver) {
+ boolean isGatewayReceiver,
+ SocketMessageWriter socketMessageWriter) {
if (ccnSingleton == null) {
ccnSingleton = new CacheClientNotifier(cache,
clientRegistrationEventQueueManager,
statisticsClock, acceptorStats, maximumMessageCount,
messageTimeToLive, listener,
- isGatewayReceiver, new CacheClientProxyFactory());
+ isGatewayReceiver, new CacheClientProxyFactory(),
socketMessageWriter);
}
if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
@@ -158,6 +153,27 @@ public class CacheClientNotifier {
return ccnSingleton;
}
+ /**
+ * Factory method to construct a CacheClientNotifier {@code
CacheClientNotifier} instance.
+ *
+ * @param cache The GemFire {@code InternalCache}
+ * @param clientRegistrationEventQueueManager Manages temporary registration
queues for clients
+ * @return A {@code CacheClientNotifier} instance
+ */
+ public static synchronized CacheClientNotifier getInstance(InternalCache
cache,
+ ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
+ StatisticsClock statisticsClock,
+ CacheServerStats acceptorStats,
+ int maximumMessageCount,
+ int messageTimeToLive,
+ ConnectionListener listener,
+ OverflowAttributes overflowAttributes,
+ boolean isGatewayReceiver) {
+ return getInstance(cache, clientRegistrationEventQueueManager,
statisticsClock,
+ acceptorStats, maximumMessageCount, messageTimeToLive, listener,
overflowAttributes,
+ isGatewayReceiver, new SocketMessageWriter());
+ }
+
public static CacheClientNotifier getInstance() {
return ccnSingleton;
}
@@ -434,15 +450,19 @@ public class CacheClientNotifier {
if (logger.isDebugEnabled()) {
logger.debug("CacheClientNotifier: Successfully registered {}",
cacheClientProxy);
}
+ performPostAuthorization(cacheClientProxy, clientProxyMembershipID,
member,
+ sysProps,
+ subjectOrPrincipal);
} else {
+ try {
+ // prevent leak by closing socket
+ socket.close();
+ } catch (IOException ignore) {
+ }
logger.warn(
"CacheClientNotifier: Unsuccessfully registered client with
identifier {} and response code {}",
new Object[] {clientProxyMembershipID, responseByte});
}
-
- performPostAuthorization(cacheClientProxy, clientProxyMembershipID, member,
- sysProps,
- subjectOrPrincipal);
}
private void handleAuthenticationException(final ClientProxyMembershipID
clientProxyMembershipID,
@@ -1708,7 +1728,9 @@ public class CacheClientNotifier {
int messageTimeToLive,
ConnectionListener listener,
boolean isGatewayReceiver,
- CacheClientProxyFactory cacheClientProxyFactory) {
+ CacheClientProxyFactory cacheClientProxyFactory,
+ SocketMessageWriter socketMessageWriter) {
+ this.socketMessageWriter = socketMessageWriter;
this.cacheClientProxyFactory = cacheClientProxyFactory;
// Set the Cache
setCache(cache);
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
index 11258da..ff20899 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
@@ -36,7 +36,6 @@ import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.ClientServerObserver;
import org.apache.geode.internal.cache.ClientServerObserverHolder;
import org.apache.geode.internal.cache.Conflatable;
@@ -141,8 +140,8 @@ public class MessageDispatcher extends LoggingThread {
.putProxy(HARegionQueue.createRegionName(getProxy().getHARegionName()),
getProxy());
boolean createDurableQueue = proxy.proxyID.isDurable();
boolean canHandleDelta =
(proxy.getClientVersion().compareTo(Version.GFE_61) >= 0)
- &&
InternalDistributedSystem.getAnyInstance().getConfig().getDeltaPropagation()
- && !(this._proxy.clientConflation == Handshake.CONFLATION_ON);
+ &&
proxy.getCache().getInternalDistributedSystem().getConfig().getDeltaPropagation()
+ && !(proxy.clientConflation == Handshake.CONFLATION_ON);
if ((createDurableQueue || canHandleDelta) && logger.isDebugEnabled()) {
logger.debug("Creating a {} subscription queue for {}",
createDurableQueue ? "durable" : "non-durable",
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
index 437c979..527f78f 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
@@ -34,6 +34,7 @@ import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -45,8 +46,12 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.apache.geode.Statistics;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.query.internal.cq.ServerCQ;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
@@ -57,6 +62,7 @@ import org.apache.geode.internal.cache.InternalCacheEvent;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RegionQueueException;
import org.apache.geode.internal.statistics.StatisticsClock;
+import org.apache.geode.internal.statistics.StatisticsManager;
import org.apache.geode.test.fake.Fakes;
public class CacheClientNotifierTest {
@@ -343,4 +349,40 @@ public class CacheClientNotifierTest {
cacheClientNotifier.shutdown(0);
}
}
+
+ @Test
+ public void registerClientInternalWithDuplicateDurableClientClosesSocket()
throws Exception {
+ InternalCache internalCache = Fakes.cache();
+ when(internalCache.getCCPTimer())
+ .thenReturn(mock(SystemTimer.class));
+ InternalDistributedSystem internalDistributedSystem =
mock(InternalDistributedSystem.class);
+ when(internalCache.getInternalDistributedSystem())
+ .thenReturn(internalDistributedSystem);
+
when(internalCache.getDistributedSystem()).thenReturn(internalDistributedSystem);
+
when(internalDistributedSystem.getProperties()).thenReturn(mock(Properties.class));
+ StatisticsManager statisticsManager = mock(StatisticsManager.class);
+ when(internalDistributedSystem.getStatisticsManager())
+ .thenReturn(statisticsManager);
+ Statistics statistics = mock(Statistics.class);
+ when(statisticsManager.createAtomicStatistics(any(), any()))
+ .thenReturn(statistics);
+ CacheClientNotifier cacheClientNotifier =
CacheClientNotifier.getInstance(internalCache,
+ mock(ClientRegistrationEventQueueManager.class),
mock(StatisticsClock.class),
+ mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class),
null, false,
+ mock(SocketMessageWriter.class));
+ ClientRegistrationMetadata metadata =
mock(ClientRegistrationMetadata.class);
+ ClientProxyMembershipID id = mock(ClientProxyMembershipID.class);
+ CacheClientProxy proxy = mock(CacheClientProxy.class);
+ when(proxy.getProxyID()).thenReturn(id);
+ when(metadata.getClientProxyMembershipID()).thenReturn(id);
+ when(id.getDistributedMember()).thenReturn(mock(DistributedMember.class));
+ when(id.getDurableId()).thenReturn("durable");
+ when(id.isDurable()).thenReturn(true);
+ cacheClientNotifier.addClientProxy(proxy);
+ Socket socket = mock(Socket.class);
+
+ cacheClientNotifier.registerClientInternal(metadata, socket, true, 0,
false);
+
+ verify(socket).close();
+ }
}