This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.14 by this push:
     new 14e92a0  GEODE-9819: fix durable client socket leak (#7266)
14e92a0 is described below

commit 14e92a03ad51b27e441385771d3ebdef73399d76
Author: Darrel Schneider <[email protected]>
AuthorDate: Thu Jan 13 17:02:49 2022 -0800

    GEODE-9819: fix durable client socket leak (#7266)
    
    Added unit test that reproduced the socket leak.
    This involved some change to the product classes
    to make them unit testable.
    Fixed the leak by making sure socket.close is called
    if the response code was not successful.
    
    (cherry picked from commit 97601eb2cd585f844b7f02bb73ba42fcb86a7cb4)
---
 .../cache/tier/sockets/CacheClientNotifier.java    | 54 +++++++++++++++-------
 .../cache/tier/sockets/MessageDispatcher.java      |  5 +-
 .../tier/sockets/CacheClientNotifierTest.java      | 32 +++++++++++++
 3 files changed, 72 insertions(+), 19 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index da3c345..b9b5048 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -125,18 +125,12 @@ public class CacheClientNotifier {
   @MakeNotStatic
   private static volatile CacheClientNotifier ccnSingleton;
 
-  private final SocketMessageWriter socketMessageWriter = new 
SocketMessageWriter();
+  private final SocketMessageWriter socketMessageWriter;
   private final ClientRegistrationEventQueueManager 
clientRegistrationEventQueueManager;
   private final CacheClientProxyFactory cacheClientProxyFactory;
 
-  /**
-   * Factory method to construct a CacheClientNotifier {@code 
CacheClientNotifier} instance.
-   *
-   * @param cache The GemFire {@code InternalCache}
-   * @param clientRegistrationEventQueueManager Manages temporary registration 
queues for clients
-   * @return A {@code CacheClientNotifier} instance
-   */
-  public static synchronized CacheClientNotifier getInstance(InternalCache 
cache,
+  @VisibleForTesting
+  static CacheClientNotifier getInstance(InternalCache cache,
       ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
       StatisticsClock statisticsClock,
       CacheServerStats acceptorStats,
@@ -144,11 +138,12 @@ public class CacheClientNotifier {
       int messageTimeToLive,
       ConnectionListener listener,
       OverflowAttributes overflowAttributes,
-      boolean isGatewayReceiver) {
+      boolean isGatewayReceiver,
+      SocketMessageWriter socketMessageWriter) {
     if (ccnSingleton == null) {
       ccnSingleton = new CacheClientNotifier(cache, 
clientRegistrationEventQueueManager,
           statisticsClock, acceptorStats, maximumMessageCount, 
messageTimeToLive, listener,
-          isGatewayReceiver, new CacheClientProxyFactory());
+          isGatewayReceiver, new CacheClientProxyFactory(), 
socketMessageWriter);
     }
 
     if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
@@ -159,6 +154,27 @@ public class CacheClientNotifier {
     return ccnSingleton;
   }
 
+  /**
+   * Factory method to construct a CacheClientNotifier {@code 
CacheClientNotifier} instance.
+   *
+   * @param cache The GemFire {@code InternalCache}
+   * @param clientRegistrationEventQueueManager Manages temporary registration 
queues for clients
+   * @return A {@code CacheClientNotifier} instance
+   */
+  public static synchronized CacheClientNotifier getInstance(InternalCache 
cache,
+      ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
+      StatisticsClock statisticsClock,
+      CacheServerStats acceptorStats,
+      int maximumMessageCount,
+      int messageTimeToLive,
+      ConnectionListener listener,
+      OverflowAttributes overflowAttributes,
+      boolean isGatewayReceiver) {
+    return getInstance(cache, clientRegistrationEventQueueManager, 
statisticsClock,
+        acceptorStats, maximumMessageCount, messageTimeToLive, listener, 
overflowAttributes,
+        isGatewayReceiver, new SocketMessageWriter());
+  }
+
   public static CacheClientNotifier getInstance() {
     return ccnSingleton;
   }
@@ -435,15 +451,19 @@ public class CacheClientNotifier {
       if (logger.isDebugEnabled()) {
         logger.debug("CacheClientNotifier: Successfully registered {}", 
cacheClientProxy);
       }
+      performPostAuthorization(cacheClientProxy, clientProxyMembershipID, 
member,
+          sysProps,
+          subjectOrPrincipal);
     } else {
+      try {
+        // prevent leak by closing socket
+        socket.close();
+      } catch (IOException ignore) {
+      }
       logger.warn(
           "CacheClientNotifier: Unsuccessfully registered client with 
identifier {} and response code {}",
           new Object[] {clientProxyMembershipID, responseByte});
     }
-
-    performPostAuthorization(cacheClientProxy, clientProxyMembershipID, member,
-        sysProps,
-        subjectOrPrincipal);
   }
 
   private void handleAuthenticationException(final ClientProxyMembershipID 
clientProxyMembershipID,
@@ -1715,7 +1735,9 @@ public class CacheClientNotifier {
       int messageTimeToLive,
       ConnectionListener listener,
       boolean isGatewayReceiver,
-      CacheClientProxyFactory cacheClientProxyFactory) {
+      CacheClientProxyFactory cacheClientProxyFactory,
+      SocketMessageWriter socketMessageWriter) {
+    this.socketMessageWriter = socketMessageWriter;
     this.cacheClientProxyFactory = cacheClientProxyFactory;
     // Set the Cache
     setCache(cache);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
index c062530..b0cab4a 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
@@ -36,7 +36,6 @@ import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.RegionExistsException;
 import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.ClientServerObserver;
 import org.apache.geode.internal.cache.ClientServerObserverHolder;
 import org.apache.geode.internal.cache.Conflatable;
@@ -140,8 +139,8 @@ public class MessageDispatcher extends LoggingThread {
           
.putProxy(HARegionQueue.createRegionName(getProxy().getHARegionName()), 
getProxy());
       boolean createDurableQueue = proxy.proxyID.isDurable();
       boolean canHandleDelta =
-          
InternalDistributedSystem.getAnyInstance().getConfig().getDeltaPropagation()
-              && !(_proxy.clientConflation == Handshake.CONFLATION_ON);
+          
proxy.getCache().getInternalDistributedSystem().getConfig().getDeltaPropagation()
+              && !(proxy.clientConflation == Handshake.CONFLATION_ON);
       if ((createDurableQueue || canHandleDelta) && logger.isDebugEnabled()) {
         logger.debug("Creating a {} subscription queue for {}",
             createDurableQueue ? "durable" : "non-durable",
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
index 502a36d..7651c68 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
@@ -32,6 +32,7 @@ import java.net.Socket;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
@@ -49,6 +50,7 @@ import org.apache.geode.CancelCriterion;
 import org.apache.geode.Statistics;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.query.internal.cq.ServerCQ;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.cache.EntryEventImpl;
@@ -361,4 +363,34 @@ public class CacheClientNotifierTest {
     return internalCacheEvent;
   }
 
+  @Test
+  public void registerClientInternalWithDuplicateDurableClientClosesSocket() 
throws Exception {
+    when(internalCache.getCCPTimer())
+        .thenReturn(mock(SystemTimer.class));
+    when(internalCache.getInternalDistributedSystem())
+        .thenReturn(internalDistributedSystem);
+    
when(internalCache.getDistributedSystem()).thenReturn(internalDistributedSystem);
+    
when(internalDistributedSystem.getProperties()).thenReturn(mock(Properties.class));
+    when(internalDistributedSystem.getStatisticsManager())
+        .thenReturn(statisticsManager);
+    when(statisticsManager.createAtomicStatistics(any(), any()))
+        .thenReturn(statistics);
+    cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
+        mock(ClientRegistrationEventQueueManager.class), 
mock(StatisticsClock.class),
+        mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class), 
null, false,
+        mock(SocketMessageWriter.class));
+    ClientRegistrationMetadata metadata = 
mock(ClientRegistrationMetadata.class);
+    ClientProxyMembershipID id = mock(ClientProxyMembershipID.class);
+    CacheClientProxy proxy = mock(CacheClientProxy.class);
+    when(proxy.getProxyID()).thenReturn(id);
+    when(metadata.getClientProxyMembershipID()).thenReturn(id);
+    when(id.getDistributedMember()).thenReturn(mock(DistributedMember.class));
+    when(id.getDurableId()).thenReturn("durable");
+    when(id.isDurable()).thenReturn(true);
+    cacheClientNotifier.addClientProxy(proxy);
+
+    cacheClientNotifier.registerClientInternal(metadata, socket, true, 0, 
false);
+
+    verify(socket).close();
+  }
 }

Reply via email to