This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.13 by this push:
     new ad9b0f6  GEODE-9819: fix durable client socket leak (#7270)
ad9b0f6 is described below

commit ad9b0f6d68edd802061f2a56595370f4a46f15a5
Author: Darrel Schneider <[email protected]>
AuthorDate: Fri Jan 14 13:02:09 2022 -0800

    GEODE-9819: fix durable client socket leak (#7270)
    
    Added unit test that reproduced the socket leak.
    This involved some change to the product classes
    to make them unit testable.
    Fixed the leak by making sure socket.close is called
    if the response code was not successful.
    
    (cherry picked from commit 97601eb2cd585f844b7f02bb73ba42fcb86a7cb4)
---
 .../cache/tier/sockets/CacheClientNotifier.java    | 54 +++++++++++++++-------
 .../cache/tier/sockets/MessageDispatcher.java      |  5 +-
 .../tier/sockets/CacheClientNotifierTest.java      | 42 +++++++++++++++++
 3 files changed, 82 insertions(+), 19 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index 44ec0ba..911b2e0 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -124,18 +124,12 @@ public class CacheClientNotifier {
   @MakeNotStatic
   private static volatile CacheClientNotifier ccnSingleton;
 
-  private final SocketMessageWriter socketMessageWriter = new 
SocketMessageWriter();
+  private final SocketMessageWriter socketMessageWriter;
   private final ClientRegistrationEventQueueManager 
clientRegistrationEventQueueManager;
   private final CacheClientProxyFactory cacheClientProxyFactory;
 
-  /**
-   * Factory method to construct a CacheClientNotifier {@code 
CacheClientNotifier} instance.
-   *
-   * @param cache The GemFire {@code InternalCache}
-   * @param clientRegistrationEventQueueManager Manages temporary registration 
queues for clients
-   * @return A {@code CacheClientNotifier} instance
-   */
-  public static synchronized CacheClientNotifier getInstance(InternalCache 
cache,
+  @VisibleForTesting
+  static CacheClientNotifier getInstance(InternalCache cache,
       ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
       StatisticsClock statisticsClock,
       CacheServerStats acceptorStats,
@@ -143,11 +137,12 @@ public class CacheClientNotifier {
       int messageTimeToLive,
       ConnectionListener listener,
       OverflowAttributes overflowAttributes,
-      boolean isGatewayReceiver) {
+      boolean isGatewayReceiver,
+      SocketMessageWriter socketMessageWriter) {
     if (ccnSingleton == null) {
       ccnSingleton = new CacheClientNotifier(cache, 
clientRegistrationEventQueueManager,
           statisticsClock, acceptorStats, maximumMessageCount, 
messageTimeToLive, listener,
-          isGatewayReceiver, new CacheClientProxyFactory());
+          isGatewayReceiver, new CacheClientProxyFactory(), 
socketMessageWriter);
     }
 
     if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
@@ -158,6 +153,27 @@ public class CacheClientNotifier {
     return ccnSingleton;
   }
 
+  /**
+   * Factory method to construct a CacheClientNotifier {@code 
CacheClientNotifier} instance.
+   *
+   * @param cache The GemFire {@code InternalCache}
+   * @param clientRegistrationEventQueueManager Manages temporary registration 
queues for clients
+   * @return A {@code CacheClientNotifier} instance
+   */
+  public static synchronized CacheClientNotifier getInstance(InternalCache 
cache,
+      ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
+      StatisticsClock statisticsClock,
+      CacheServerStats acceptorStats,
+      int maximumMessageCount,
+      int messageTimeToLive,
+      ConnectionListener listener,
+      OverflowAttributes overflowAttributes,
+      boolean isGatewayReceiver) {
+    return getInstance(cache, clientRegistrationEventQueueManager, 
statisticsClock,
+        acceptorStats, maximumMessageCount, messageTimeToLive, listener, 
overflowAttributes,
+        isGatewayReceiver, new SocketMessageWriter());
+  }
+
   public static CacheClientNotifier getInstance() {
     return ccnSingleton;
   }
@@ -434,15 +450,19 @@ public class CacheClientNotifier {
       if (logger.isDebugEnabled()) {
         logger.debug("CacheClientNotifier: Successfully registered {}", 
cacheClientProxy);
       }
+      performPostAuthorization(cacheClientProxy, clientProxyMembershipID, 
member,
+          sysProps,
+          subjectOrPrincipal);
     } else {
+      try {
+        // prevent leak by closing socket
+        socket.close();
+      } catch (IOException ignore) {
+      }
       logger.warn(
           "CacheClientNotifier: Unsuccessfully registered client with 
identifier {} and response code {}",
           new Object[] {clientProxyMembershipID, responseByte});
     }
-
-    performPostAuthorization(cacheClientProxy, clientProxyMembershipID, member,
-        sysProps,
-        subjectOrPrincipal);
   }
 
   private void handleAuthenticationException(final ClientProxyMembershipID 
clientProxyMembershipID,
@@ -1708,7 +1728,9 @@ public class CacheClientNotifier {
       int messageTimeToLive,
       ConnectionListener listener,
       boolean isGatewayReceiver,
-      CacheClientProxyFactory cacheClientProxyFactory) {
+      CacheClientProxyFactory cacheClientProxyFactory,
+      SocketMessageWriter socketMessageWriter) {
+    this.socketMessageWriter = socketMessageWriter;
     this.cacheClientProxyFactory = cacheClientProxyFactory;
     // Set the Cache
     setCache(cache);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
index 11258da..ff20899 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.java
@@ -36,7 +36,6 @@ import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.RegionExistsException;
 import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.ClientServerObserver;
 import org.apache.geode.internal.cache.ClientServerObserverHolder;
 import org.apache.geode.internal.cache.Conflatable;
@@ -141,8 +140,8 @@ public class MessageDispatcher extends LoggingThread {
           
.putProxy(HARegionQueue.createRegionName(getProxy().getHARegionName()), 
getProxy());
       boolean createDurableQueue = proxy.proxyID.isDurable();
       boolean canHandleDelta = 
(proxy.getClientVersion().compareTo(Version.GFE_61) >= 0)
-          && 
InternalDistributedSystem.getAnyInstance().getConfig().getDeltaPropagation()
-          && !(this._proxy.clientConflation == Handshake.CONFLATION_ON);
+          && 
proxy.getCache().getInternalDistributedSystem().getConfig().getDeltaPropagation()
+          && !(proxy.clientConflation == Handshake.CONFLATION_ON);
       if ((createDurableQueue || canHandleDelta) && logger.isDebugEnabled()) {
         logger.debug("Creating a {} subscription queue for {}",
             createDurableQueue ? "durable" : "non-durable",
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
index 437c979..527f78f 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
@@ -34,6 +34,7 @@ import java.net.Socket;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -45,8 +46,12 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.geode.Statistics;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.query.internal.cq.ServerCQ;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EnumListenerEvent;
@@ -57,6 +62,7 @@ import org.apache.geode.internal.cache.InternalCacheEvent;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.RegionQueueException;
 import org.apache.geode.internal.statistics.StatisticsClock;
+import org.apache.geode.internal.statistics.StatisticsManager;
 import org.apache.geode.test.fake.Fakes;
 
 public class CacheClientNotifierTest {
@@ -343,4 +349,40 @@ public class CacheClientNotifierTest {
       cacheClientNotifier.shutdown(0);
     }
   }
+
+  @Test
+  public void registerClientInternalWithDuplicateDurableClientClosesSocket() 
throws Exception {
+    InternalCache internalCache = Fakes.cache();
+    when(internalCache.getCCPTimer())
+        .thenReturn(mock(SystemTimer.class));
+    InternalDistributedSystem internalDistributedSystem = 
mock(InternalDistributedSystem.class);
+    when(internalCache.getInternalDistributedSystem())
+        .thenReturn(internalDistributedSystem);
+    
when(internalCache.getDistributedSystem()).thenReturn(internalDistributedSystem);
+    
when(internalDistributedSystem.getProperties()).thenReturn(mock(Properties.class));
+    StatisticsManager statisticsManager = mock(StatisticsManager.class);
+    when(internalDistributedSystem.getStatisticsManager())
+        .thenReturn(statisticsManager);
+    Statistics statistics = mock(Statistics.class);
+    when(statisticsManager.createAtomicStatistics(any(), any()))
+        .thenReturn(statistics);
+    CacheClientNotifier cacheClientNotifier = 
CacheClientNotifier.getInstance(internalCache,
+        mock(ClientRegistrationEventQueueManager.class), 
mock(StatisticsClock.class),
+        mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class), 
null, false,
+        mock(SocketMessageWriter.class));
+    ClientRegistrationMetadata metadata = 
mock(ClientRegistrationMetadata.class);
+    ClientProxyMembershipID id = mock(ClientProxyMembershipID.class);
+    CacheClientProxy proxy = mock(CacheClientProxy.class);
+    when(proxy.getProxyID()).thenReturn(id);
+    when(metadata.getClientProxyMembershipID()).thenReturn(id);
+    when(id.getDistributedMember()).thenReturn(mock(DistributedMember.class));
+    when(id.getDurableId()).thenReturn("durable");
+    when(id.isDurable()).thenReturn(true);
+    cacheClientNotifier.addClientProxy(proxy);
+    Socket socket = mock(Socket.class);
+
+    cacheClientNotifier.registerClientInternal(metadata, socket, true, 0, 
false);
+
+    verify(socket).close();
+  }
 }

Reply via email to