This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.12 by this push:
     new 95f95f5  GEODE-9819: fix durable client socket leak (#7271)
95f95f5 is described below

commit 95f95f52c8fa9f97206fc0b2a2d7c79a6114152a
Author: Darrel Schneider <[email protected]>
AuthorDate: Fri Jan 14 16:30:33 2022 -0800

    GEODE-9819: fix durable client socket leak (#7271)
    
    Added unit test that reproduced the socket leak.
    This involved some change to the product classes
    to make them unit testable.
    Fixed the leak by making sure socket.close is called
    if the response code was not successful.
    
    (cherry picked from commit 97601eb2cd585f844b7f02bb73ba42fcb86a7cb4)
---
 .../cache/tier/sockets/CacheClientNotifier.java    | 57 +++++++++++++++-------
 .../cache/tier/sockets/CacheClientProxy.java       |  5 +-
 .../tier/sockets/CacheClientNotifierTest.java      | 42 ++++++++++++++++
 3 files changed, 84 insertions(+), 20 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index b6d5f7c..1924ef3 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -124,17 +124,11 @@ public class CacheClientNotifier {
   @MakeNotStatic
   private static volatile CacheClientNotifier ccnSingleton;
 
-  private final SocketMessageWriter socketMessageWriter = new 
SocketMessageWriter();
+  private final SocketMessageWriter socketMessageWriter;
   private final ClientRegistrationEventQueueManager 
clientRegistrationEventQueueManager;
 
-  /**
-   * Factory method to construct a CacheClientNotifier {@code 
CacheClientNotifier} instance.
-   *
-   * @param cache The GemFire {@code InternalCache}
-   * @param clientRegistrationEventQueueManager Manages temporary registration 
queues for clients
-   * @return A {@code CacheClientNotifier} instance
-   */
-  public static synchronized CacheClientNotifier getInstance(InternalCache 
cache,
+  @VisibleForTesting
+  static CacheClientNotifier getInstance(InternalCache cache,
       ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
       StatisticsClock statisticsClock,
       CacheServerStats acceptorStats,
@@ -142,11 +136,12 @@ public class CacheClientNotifier {
       int messageTimeToLive,
       ConnectionListener listener,
       OverflowAttributes overflowAttributes,
-      boolean isGatewayReceiver) {
+      boolean isGatewayReceiver,
+      SocketMessageWriter socketMessageWriter) {
     if (ccnSingleton == null) {
       ccnSingleton = new CacheClientNotifier(cache, 
clientRegistrationEventQueueManager,
-          statisticsClock, acceptorStats,
-          maximumMessageCount, messageTimeToLive, listener, isGatewayReceiver);
+          statisticsClock, acceptorStats, maximumMessageCount, 
messageTimeToLive, listener,
+          isGatewayReceiver, socketMessageWriter);
     }
 
     if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
@@ -157,6 +152,27 @@ public class CacheClientNotifier {
     return ccnSingleton;
   }
 
+  /**
+   * Factory method to construct a CacheClientNotifier {@code 
CacheClientNotifier} instance.
+   *
+   * @param cache The GemFire {@code InternalCache}
+   * @param clientRegistrationEventQueueManager Manages temporary registration 
queues for clients
+   * @return A {@code CacheClientNotifier} instance
+   */
+  public static synchronized CacheClientNotifier getInstance(InternalCache 
cache,
+      ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
+      StatisticsClock statisticsClock,
+      CacheServerStats acceptorStats,
+      int maximumMessageCount,
+      int messageTimeToLive,
+      ConnectionListener listener,
+      OverflowAttributes overflowAttributes,
+      boolean isGatewayReceiver) {
+    return getInstance(cache, clientRegistrationEventQueueManager, 
statisticsClock,
+        acceptorStats, maximumMessageCount, messageTimeToLive, listener, 
overflowAttributes,
+        isGatewayReceiver, new SocketMessageWriter());
+  }
+
   public static CacheClientNotifier getInstance() {
     return ccnSingleton;
   }
@@ -433,15 +449,19 @@ public class CacheClientNotifier {
       if (logger.isDebugEnabled()) {
         logger.debug("CacheClientNotifier: Successfully registered {}", 
cacheClientProxy);
       }
+      performPostAuthorization(cacheClientProxy, clientProxyMembershipID, 
member,
+          sysProps,
+          subjectOrPrincipal);
     } else {
+      try {
+        // prevent leak by closing socket
+        socket.close();
+      } catch (IOException ignore) {
+      }
       logger.warn(
           "CacheClientNotifier: Unsuccessfully registered client with 
identifier {} and response code {}",
           new Object[] {clientProxyMembershipID, responseByte});
     }
-
-    performPostAuthorization(cacheClientProxy, clientProxyMembershipID, member,
-        sysProps,
-        subjectOrPrincipal);
   }
 
   private void handleAuthenticationException(final ClientProxyMembershipID 
clientProxyMembershipID,
@@ -1704,7 +1724,10 @@ public class CacheClientNotifier {
       StatisticsClock statisticsClock,
       CacheServerStats acceptorStats, int maximumMessageCount,
       int messageTimeToLive,
-      ConnectionListener listener, boolean isGatewayReceiver) {
+      ConnectionListener listener,
+      boolean isGatewayReceiver,
+      SocketMessageWriter socketMessageWriter) {
+    this.socketMessageWriter = socketMessageWriter;
     // Set the Cache
     setCache(cache);
     this.clientRegistrationEventQueueManager = 
clientRegistrationEventQueueManager;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index c2926d7..cc91015 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -66,7 +66,6 @@ import org.apache.geode.cache.query.CqException;
 import org.apache.geode.cache.query.internal.cq.CqService;
 import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
 import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.OperationExecutors;
 import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.SystemTimer.SystemTimerTask;
@@ -2255,8 +2254,8 @@ public class CacheClientProxy implements ClientSession {
             
.putProxy(HARegionQueue.createRegionName(getProxy().getHARegionName()), 
getProxy());
         boolean createDurableQueue = proxy.proxyID.isDurable();
         boolean canHandleDelta = 
(proxy.clientVersion.compareTo(Version.GFE_61) >= 0)
-            && 
InternalDistributedSystem.getAnyInstance().getConfig().getDeltaPropagation()
-            && !(this._proxy.clientConflation == Handshake.CONFLATION_ON);
+            && 
proxy.getCache().getInternalDistributedSystem().getConfig().getDeltaPropagation()
+            && !(proxy.clientConflation == Handshake.CONFLATION_ON);
         if ((createDurableQueue || canHandleDelta) && logger.isDebugEnabled()) 
{
           logger.debug("Creating a {} subscription queue for {}",
               createDurableQueue ? "durable" : "non-durable",
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
index 437c979..527f78f 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
@@ -34,6 +34,7 @@ import java.net.Socket;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -45,8 +46,12 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.geode.Statistics;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.query.internal.cq.ServerCQ;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EnumListenerEvent;
@@ -57,6 +62,7 @@ import org.apache.geode.internal.cache.InternalCacheEvent;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.RegionQueueException;
 import org.apache.geode.internal.statistics.StatisticsClock;
+import org.apache.geode.internal.statistics.StatisticsManager;
 import org.apache.geode.test.fake.Fakes;
 
 public class CacheClientNotifierTest {
@@ -343,4 +349,40 @@ public class CacheClientNotifierTest {
       cacheClientNotifier.shutdown(0);
     }
   }
+
+  @Test
+  public void registerClientInternalWithDuplicateDurableClientClosesSocket() 
throws Exception {
+    InternalCache internalCache = Fakes.cache();
+    when(internalCache.getCCPTimer())
+        .thenReturn(mock(SystemTimer.class));
+    InternalDistributedSystem internalDistributedSystem = 
mock(InternalDistributedSystem.class);
+    when(internalCache.getInternalDistributedSystem())
+        .thenReturn(internalDistributedSystem);
+    
when(internalCache.getDistributedSystem()).thenReturn(internalDistributedSystem);
+    
when(internalDistributedSystem.getProperties()).thenReturn(mock(Properties.class));
+    StatisticsManager statisticsManager = mock(StatisticsManager.class);
+    when(internalDistributedSystem.getStatisticsManager())
+        .thenReturn(statisticsManager);
+    Statistics statistics = mock(Statistics.class);
+    when(statisticsManager.createAtomicStatistics(any(), any()))
+        .thenReturn(statistics);
+    CacheClientNotifier cacheClientNotifier = 
CacheClientNotifier.getInstance(internalCache,
+        mock(ClientRegistrationEventQueueManager.class), 
mock(StatisticsClock.class),
+        mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class), 
null, false,
+        mock(SocketMessageWriter.class));
+    ClientRegistrationMetadata metadata = 
mock(ClientRegistrationMetadata.class);
+    ClientProxyMembershipID id = mock(ClientProxyMembershipID.class);
+    CacheClientProxy proxy = mock(CacheClientProxy.class);
+    when(proxy.getProxyID()).thenReturn(id);
+    when(metadata.getClientProxyMembershipID()).thenReturn(id);
+    when(id.getDistributedMember()).thenReturn(mock(DistributedMember.class));
+    when(id.getDurableId()).thenReturn("durable");
+    when(id.isDurable()).thenReturn(true);
+    cacheClientNotifier.addClientProxy(proxy);
+    Socket socket = mock(Socket.class);
+
+    cacheClientNotifier.registerClientInternal(metadata, socket, true, 0, 
false);
+
+    verify(socket).close();
+  }
 }

Reply via email to