This is an automated email from the ASF dual-hosted git repository.

onichols pushed a commit to branch release/1.10.0
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/release/1.10.0 by this push:
     new 920723c  GEODE-7089: Each client registration thread uses its own 
queue (#3976)
920723c is described below

commit 920723c5b56b9c00ccb2366adb5e4ba0f0f06927
Author: Ryan McMahon <[email protected]>
AuthorDate: Tue Aug 27 09:29:53 2019 -0700

    GEODE-7089: Each client registration thread uses its own queue (#3976)
    
    Co-authored-by: Ryan McMahon <[email protected]>
    Co-authored-by: Donal Evans <[email protected]>
    
    (cherry picked from commit 5d0153ad4adb1612a1083673f98b1982819a6589)
---
 .../cache/ha/HARegionQueueIntegrationTest.java     |  44 +--
 .../internal/cache/tier/sockets/AcceptorImpl.java  |  11 +-
 .../cache/tier/sockets/CacheClientNotifier.java    |  61 ++--
 .../ClientRegistrationEventQueueManager.java       | 357 ++++++++++-----------
 .../tier/sockets/ClientUpdateMessageImpl.java      |   4 -
 .../geode/internal/cache/BucketRegionTest.java     |  10 +-
 .../geode/internal/cache/CacheServerImplTest.java  |  40 ++-
 .../internal/cache/LocalRegionPartialMockTest.java |  11 +-
 .../cache/tier/sockets/AcceptorImplTest.java       |  10 +-
 .../tier/sockets/CacheClientNotifierTest.java      |  70 +++-
 .../ClientRegistrationEventQueueManagerTest.java   | 271 ++++++++++------
 11 files changed, 498 insertions(+), 391 deletions(-)

diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
index 6592dce..5e19f3f 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
@@ -20,7 +20,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -50,8 +49,6 @@ import org.mockito.MockitoAnnotations;
 import util.TestException;
 
 import org.apache.geode.CancelCriterion;
-import org.apache.geode.Statistics;
-import org.apache.geode.StatisticsType;
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
@@ -61,12 +58,7 @@ import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.Scope;
-import org.apache.geode.distributed.internal.DSClock;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.CacheServerImpl;
 import org.apache.geode.internal.cache.CachedDeserializable;
@@ -81,6 +73,7 @@ import org.apache.geode.internal.cache.VMCachedDeserializable;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
 import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import 
org.apache.geode.internal.cache.tier.sockets.ClientRegistrationEventQueueManager;
 import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessage;
 import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl;
 import org.apache.geode.internal.cache.tier.sockets.ConnectionListener;
@@ -108,7 +101,6 @@ public class HARegionQueueIntegrationTest {
     dataRegion = createDataRegion();
     ccn = createCacheClientNotifier();
     member = createMember();
-    HAContainerWrapper haContainerWrapper = (HAContainerWrapper) 
ccn.getHaContainer();
   }
 
   @After
@@ -125,38 +117,12 @@ public class HARegionQueueIntegrationTest {
     return cache.createRegionFactory(RegionShortcut.REPLICATE).create("data");
   }
 
-  private InternalCache createMockInternalCache() {
-    InternalCache mockInternalCache = mock(InternalCache.class);
-    doReturn(mock(SystemTimer.class)).when(mockInternalCache).getCCPTimer();
-    
doReturn(mock(CancelCriterion.class)).when(mockInternalCache).getCancelCriterion();
-
-    InternalDistributedSystem mockInteralDistributedSystem = 
createMockInternalDistributedSystem();
-    
doReturn(mockInteralDistributedSystem).when(mockInternalCache).getInternalDistributedSystem();
-    
doReturn(mockInteralDistributedSystem).when(mockInternalCache).getDistributedSystem();
-
-    return mockInternalCache;
-  }
-
-  private InternalDistributedSystem createMockInternalDistributedSystem() {
-    InternalDistributedSystem mockInternalDistributedSystem =
-        mock(InternalDistributedSystem.class);
-    DistributionManager mockDistributionManager = 
mock(DistributionManager.class);
-
-    
doReturn(mock(InternalDistributedMember.class)).when(mockInternalDistributedSystem)
-        .getDistributedMember();
-    doReturn(mock(Statistics.class)).when(mockInternalDistributedSystem)
-        .createAtomicStatistics(any(StatisticsType.class), any(String.class));
-    
doReturn(mock(DistributionConfig.class)).when(mockDistributionManager).getConfig();
-    
doReturn(mockDistributionManager).when(mockInternalDistributedSystem).getDistributionManager();
-    
doReturn(mock(DSClock.class)).when(mockInternalDistributedSystem).getClock();
-
-    return mockInternalDistributedSystem;
-  }
-
   private CacheClientNotifier createCacheClientNotifier() {
     CacheClientNotifier ccn =
-        CacheClientNotifier.getInstance((InternalCache) cache, 
mock(CacheServerStats.class),
-            100000, 100000, mock(ConnectionListener.class), null, false);
+        CacheClientNotifier.getInstance((InternalCache) cache,
+            mock(ClientRegistrationEventQueueManager.class), 
mock(CacheServerStats.class),
+            100000, 100000, mock(ConnectionListener.class),
+            null, false);
     return ccn;
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 8fdd78a..ed34572 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -400,15 +400,14 @@ public class AcceptorImpl implements Acceptor, Runnable {
    * @param maxConnections the maximum number of connections allowed in the 
server pool
    * @param maxThreads the maximum number of threads allowed in the server pool
    * @param securityService the SecurityService to use for authentication and 
authorization
-   * @param gatewayReceiver the GatewayReceiver that will use this 
AcceptorImpl instance
-   * @param gatewayReceiverMetrics the GatewayReceiverMetrics to use for 
exposing metrics
    * @param gatewayTransportFilters List of GatewayTransportFilters
    */
   AcceptorImpl(final int port, final String bindHostName, final boolean 
notifyBySubscription,
       final int socketBufferSize, final int maximumTimeBetweenPings,
       final InternalCache internalCache, final int maxConnections, final int 
maxThreads,
       final int maximumMessageCount, final int messageTimeToLive,
-      final ConnectionListener connectionListener, final OverflowAttributes 
overflowAttributes,
+      final ConnectionListener connectionListener,
+      final OverflowAttributes overflowAttributes,
       final boolean tcpNoDelay, final ServerConnectionFactory 
serverConnectionFactory,
       final long timeLimitMillis, final SecurityService securityService,
       final Supplier<SocketCreator> socketCreatorSupplier,
@@ -609,8 +608,10 @@ public class AcceptorImpl implements Acceptor, Runnable {
     cache = internalCache;
     crHelper = new CachedRegionHelper(cache);
 
-    clientNotifier = cacheClientNotifierProvider.get(internalCache, stats, 
maximumMessageCount,
-        messageTimeToLive, this.connectionListener, overflowAttributes, 
isGatewayReceiver());
+    clientNotifier =
+        cacheClientNotifierProvider.get(internalCache, new 
ClientRegistrationEventQueueManager(),
+            stats, maximumMessageCount, messageTimeToLive, 
this.connectionListener,
+            overflowAttributes, isGatewayReceiver());
 
     this.socketBufferSize = socketBufferSize;
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index 090d2b2..3c8134f 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -40,7 +40,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 
@@ -126,22 +125,27 @@ public class CacheClientNotifier {
   private static volatile CacheClientNotifier ccnSingleton;
 
   private final SocketMessageWriter socketMessageWriter = new 
SocketMessageWriter();
-  private final ClientRegistrationEventQueueManager registrationQueueManager =
-      new ClientRegistrationEventQueueManager();
+  private final ClientRegistrationEventQueueManager 
clientRegistrationEventQueueManager;
 
   /**
    * Factory method to construct a CacheClientNotifier {@code 
CacheClientNotifier} instance.
    *
    * @param cache The GemFire {@code InternalCache}
+   * @param clientRegistrationEventQueueManager Manages temporary registration 
queues for clients
    * @return A {@code CacheClientNotifier} instance
    */
   public static synchronized CacheClientNotifier getInstance(InternalCache 
cache,
-      CacheServerStats acceptorStats, int maximumMessageCount, int 
messageTimeToLive,
-      ConnectionListener listener, OverflowAttributes overflowAttributes,
+      ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
+      CacheServerStats acceptorStats,
+      int maximumMessageCount,
+      int messageTimeToLive,
+      ConnectionListener listener,
+      OverflowAttributes overflowAttributes,
       boolean isGatewayReceiver) {
     if (ccnSingleton == null) {
-      ccnSingleton = new CacheClientNotifier(cache, acceptorStats, 
maximumMessageCount,
-          messageTimeToLive, listener, isGatewayReceiver);
+      ccnSingleton =
+          new CacheClientNotifier(cache, clientRegistrationEventQueueManager, 
acceptorStats,
+              maximumMessageCount, messageTimeToLive, listener, 
isGatewayReceiver);
     }
 
     if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
@@ -177,16 +181,16 @@ public class CacheClientNotifier {
 
     try {
       if (isClientPermitted(clientRegistrationMetadata, 
clientProxyMembershipID)) {
-        registrationQueueManager.create(clientProxyMembershipID, new 
ConcurrentLinkedQueue<>(),
-            new ReentrantReadWriteLock(), new ReentrantLock());
+        ClientRegistrationEventQueueManager.ClientRegistrationEventQueue 
clientRegistrationEventQueue =
+            clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+                new ConcurrentLinkedQueue<>(),
+                new ReentrantReadWriteLock());
 
         try {
           registerClientInternal(clientRegistrationMetadata, socket, 
isPrimary, acceptorId,
               notifyBySubscription);
         } finally {
-          if (isProxyInitialized(clientProxyMembershipID)) {
-            registrationQueueManager.drain(clientProxyMembershipID, this);
-          }
+          
clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, this);
         }
       }
     } catch (AuthenticationRequiredException ex) {
@@ -288,8 +292,8 @@ public class CacheClientNotifier {
         }
         cacheClientProxy =
             new CacheClientProxy(this, socket, clientProxyMembershipID, 
isPrimary, clientConflation,
-                clientVersion,
-                acceptorId, notifyBySubscription, cache.getSecurityService(), 
subject);
+                clientVersion, acceptorId, notifyBySubscription, 
cache.getSecurityService(),
+                subject);
         successful = initializeProxy(cacheClientProxy);
       } else {
         cacheClientProxy.setSubject(subject);
@@ -681,7 +685,7 @@ public class CacheClientNotifier {
       conflatable = wrapper;
     }
 
-    registrationQueueManager.add(event, conflatable, filterClients, this);
+    clientRegistrationEventQueueManager.add(event, conflatable, filterClients, 
this);
 
     singletonRouteClientMessage(conflatable, filterClients);
 
@@ -1185,16 +1189,6 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Determines whether a client proxy has been initialized
-   *
-   * @param clientProxyMembershipID The client proxy membership ID
-   * @return Whether the client proxy is initialized
-   */
-  private boolean isProxyInitialized(final ClientProxyMembershipID 
clientProxyMembershipID) {
-    return getClientProxy(clientProxyMembershipID) != null;
-  }
-
-  /**
    * Returns the {@code CacheClientProxy} associated to the membershipID *
    *
    * @return the {@code CacheClientProxy} associated to the membershipID
@@ -1696,11 +1690,14 @@ public class CacheClientNotifier {
    * @param cache The GemFire {@code InternalCache}
    * @param listener a listener which should receive notifications abouts 
queues being added or
    */
-  private CacheClientNotifier(InternalCache cache, CacheServerStats 
acceptorStats,
-      int maximumMessageCount, int messageTimeToLive, ConnectionListener 
listener,
-      boolean isGatewayReceiver) {
+  private CacheClientNotifier(InternalCache cache,
+      ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
+      CacheServerStats acceptorStats, int maximumMessageCount,
+      int messageTimeToLive,
+      ConnectionListener listener, boolean isGatewayReceiver) {
     // Set the Cache
     setCache(cache);
+    this.clientRegistrationEventQueueManager = 
clientRegistrationEventQueueManager;
     this.acceptorStats = acceptorStats;
     // we only need one thread per client and wait 50ms for close
     socketCloser = new SocketCloser(1, 50);
@@ -2095,9 +2092,11 @@ public class CacheClientNotifier {
   @FunctionalInterface
   @VisibleForTesting
   public interface CacheClientNotifierProvider {
-    CacheClientNotifier get(InternalCache cache, CacheServerStats 
acceptorStats,
-        int maximumMessageCount, int messageTimeToLive, ConnectionListener 
listener,
-        OverflowAttributes overflowAttributes, boolean isGatewayReceiver);
+    CacheClientNotifier get(InternalCache cache,
+        ClientRegistrationEventQueueManager 
clientRegistrationEventQueueManager,
+        CacheServerStats acceptorStats, int maximumMessageCount, int 
messageTimeToLive,
+        ConnectionListener listener, OverflowAttributes overflowAttributes,
+        boolean isGatewayReceiver);
   }
 
   @VisibleForTesting
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
index 02df497..8216de6 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
@@ -15,12 +15,9 @@
 
 package org.apache.geode.internal.cache.tier.sockets;
 
-import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.logging.log4j.Logger;
 
@@ -30,6 +27,7 @@ import org.apache.geode.internal.cache.FilterProfile;
 import org.apache.geode.internal.cache.FilterRoutingInfo;
 import org.apache.geode.internal.cache.InternalCacheEvent;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.concurrent.ConcurrentHashSet;
 import org.apache.geode.internal.logging.LogService;
 
 /**
@@ -37,10 +35,10 @@ import org.apache.geode.internal.logging.LogService;
  * client is completely registered (filter info retrieved and GII complete), 
we will drain the
  * client's queued events and deliver them to the cache client proxy if 
necessary.
  */
-class ClientRegistrationEventQueueManager {
+public class ClientRegistrationEventQueueManager {
   private static final Logger logger = LogService.getLogger();
-  private final Map<ClientProxyMembershipID, ClientRegistrationEventQueue> 
registeringProxyEventQueues =
-      new ConcurrentHashMap<>();
+  private final Set<ClientRegistrationEventQueue> registeringProxyEventQueues =
+      new ConcurrentHashSet<>();
 
   void add(final InternalCacheEvent event,
       final Conflatable conflatable,
@@ -52,13 +50,7 @@ class ClientRegistrationEventQueueManager {
     ClientRegistrationEvent clientRegistrationEvent =
         new ClientRegistrationEvent(event, conflatable);
 
-    for (final Map.Entry<ClientProxyMembershipID, 
ClientRegistrationEventQueue> eventsReceivedWhileRegisteringClient : 
registeringProxyEventQueues
-        .entrySet()) {
-      ClientProxyMembershipID clientProxyMembershipID =
-          eventsReceivedWhileRegisteringClient.getKey();
-      ClientRegistrationEventQueue registrationEventQueue =
-          eventsReceivedWhileRegisteringClient.getValue();
-
+    for (final ClientRegistrationEventQueue registrationEventQueue : 
registeringProxyEventQueues) {
       registrationEventQueue.lockForPutting();
       try {
         // If this is an HAEventWrapper we need to increment the PutInProgress 
counter so
@@ -69,10 +61,13 @@ class ClientRegistrationEventQueueManager {
           ((HAEventWrapper) conflatable).incrementPutInProgressCounter("client 
registration");
         }
 
+        ClientProxyMembershipID clientProxyMembershipID =
+            registrationEventQueue.getClientProxyMembershipID();
+
         // After taking out the lock, we need to determine if the client is 
still actually
         // registering since there is a small race where it may have finished 
registering
         // after we pulled the queue out of the registeringProxyEventQueues 
collection
-        if (registeringProxyEventQueues.containsKey(clientProxyMembershipID)) {
+        if (registeringProxyEventQueues.contains(registrationEventQueue)) {
           // If the event value is off-heap, copy it to heap so we are 
guaranteed the value
           // is available when we drain the registration queue
           copyOffHeapToHeapForRegistrationQueue(event);
@@ -90,7 +85,10 @@ class ClientRegistrationEventQueueManager {
           // this event and potentially deliver the conflatable to handle the 
edge case where
           // the original client filter IDs generated in the "normal" put 
processing path did
           // not include the registering client because the filter info was 
not yet available.
-          processEventAndDeliverConflatable(clientProxyMembershipID, 
cacheClientNotifier, event,
+          CacheClientProxy cacheClientProxy =
+              cacheClientNotifier.getClientProxy(clientProxyMembershipID);
+
+          processEventAndDeliverConflatable(cacheClientProxy, 
cacheClientNotifier, event,
               conflatable, originalFilterClientIDs);
         }
       } finally {
@@ -99,6 +97,132 @@ class ClientRegistrationEventQueueManager {
     }
   }
 
+  void drain(final ClientRegistrationEventQueue clientRegistrationEventQueue,
+      final CacheClientNotifier cacheClientNotifier) {
+    try {
+      ClientProxyMembershipID clientProxyMembershipID =
+          clientRegistrationEventQueue.getClientProxyMembershipID();
+
+      if (logger.isDebugEnabled()) {
+        logger.debug("Draining events from registration queue for client proxy 
"
+            + clientProxyMembershipID
+            + " without synchronization");
+      }
+
+      CacheClientProxy cacheClientProxy = cacheClientNotifier
+          .getClientProxy(clientProxyMembershipID);
+
+      drainEventsReceivedWhileRegisteringClient(
+          cacheClientProxy,
+          clientRegistrationEventQueue,
+          cacheClientNotifier);
+
+      // Prevents additional events from being added to the queue while we 
process and remove it
+      clientRegistrationEventQueue.lockForDraining();
+
+      if (logger.isDebugEnabled()) {
+        logger.debug("Draining remaining events from registration queue for 
client proxy "
+            + clientProxyMembershipID
+            + " with synchronization");
+      }
+
+      drainEventsReceivedWhileRegisteringClient(
+          cacheClientProxy,
+          clientRegistrationEventQueue,
+          cacheClientNotifier);
+    } finally {
+      // The queue must be removed before attempting to release the drain lock
+      // so that no additional events can be added from add threads.
+      registeringProxyEventQueues.remove(clientRegistrationEventQueue);
+
+      if (clientRegistrationEventQueue.isLockForDrainingHeld()) {
+        clientRegistrationEventQueue.unlockForDraining();
+      }
+    }
+  }
+
+  ClientRegistrationEventQueue create(
+      final ClientProxyMembershipID clientProxyMembershipID,
+      final Queue<ClientRegistrationEvent> eventQueue,
+      final ReentrantReadWriteLock eventAddDrainLock) {
+    final ClientRegistrationEventQueue clientRegistrationEventQueue =
+        new ClientRegistrationEventQueue(clientProxyMembershipID, eventQueue,
+            eventAddDrainLock);
+    registeringProxyEventQueues.add(clientRegistrationEventQueue);
+    return clientRegistrationEventQueue;
+  }
+
+  private void processEventAndDeliverConflatable(final CacheClientProxy 
cacheClientProxy,
+      final CacheClientNotifier cacheClientNotifier,
+      final InternalCacheEvent internalCacheEvent,
+      final Conflatable conflatable,
+      final Set<ClientProxyMembershipID> originalFilterClientIDs) {
+    try {
+      // If the cache client proxy is null, the registration was not 
successful and the proxy
+      // was never added to the initialized proxy collection managed by the 
cache client notifier.
+      // If that is the case, we can just decrement the put in progress 
counter on the conflatable
+      // if it is an HAEventWrapper.
+      if (cacheClientProxy != null) {
+        // The first step is to repopulate the filter info for the event to 
determine if
+        // the client which was registering has a matching CQ or has 
registered interest
+        // in the key for this event. We need to get the filter profile, 
filter routing info,
+        // and local filter info in order to do so. If any of these are null, 
then there is
+        // no need to proceed as the client is not interested.
+        FilterProfile filterProfile =
+            ((LocalRegion) internalCacheEvent.getRegion()).getFilterProfile();
+
+        if (filterProfile != null) {
+          FilterRoutingInfo filterRoutingInfo =
+              filterProfile.getFilterRoutingInfoPart2(null, 
internalCacheEvent);
+
+          if (filterRoutingInfo != null) {
+            FilterRoutingInfo.FilterInfo filterInfo = 
filterRoutingInfo.getLocalFilterInfo();
+
+            if (filterInfo != null) {
+              ClientUpdateMessageImpl clientUpdateMessage = conflatable 
instanceof HAEventWrapper
+                  ? (ClientUpdateMessageImpl) ((HAEventWrapper) conflatable)
+                      .getClientUpdateMessage()
+                  : (ClientUpdateMessageImpl) conflatable;
+
+              internalCacheEvent.setLocalFilterInfo(filterInfo);
+
+              Set<ClientProxyMembershipID> newFilterClientIDs =
+                  cacheClientNotifier.getFilterClientIDs(internalCacheEvent, 
filterProfile,
+                      filterInfo,
+                      clientUpdateMessage);
+
+              ClientProxyMembershipID proxyID = cacheClientProxy.getProxyID();
+              if (eventNotInOriginalFilterClientIDs(proxyID, 
newFilterClientIDs,
+                  originalFilterClientIDs) && 
newFilterClientIDs.contains(proxyID)) {
+                cacheClientProxy.deliverMessage(conflatable);
+              }
+            }
+          }
+        }
+      }
+    } finally {
+      // Once we have processed the conflatable, if it is an HAEventWrapper we 
can
+      // decrement the PutInProgress counter, allowing the ClientUpdateMessage 
to be
+      // set to null. See decrementPutInProgressCounter() for more details.
+      if (conflatable instanceof HAEventWrapper) {
+        ((HAEventWrapper) conflatable).decrementPutInProgressCounter();
+      }
+    }
+  }
+
+  /*
+   * This is to handle the edge case where the original filter client IDs
+   * calculated by "normal" put processing did not include the registering 
client
+   * because the filter info had not been received yet, but we now found that 
the client
+   * is interested in the event so we should deliver it.
+   */
+  private boolean eventNotInOriginalFilterClientIDs(final 
ClientProxyMembershipID proxyID,
+      final Set<ClientProxyMembershipID> newFilterClientIDs,
+      final Set<ClientProxyMembershipID> originalFilterClientIDs) {
+    return originalFilterClientIDs == null
+        || (!originalFilterClientIDs.contains(proxyID) && 
newFilterClientIDs.contains(proxyID));
+  }
+
   /**
    * For simplicity, we will copy off-heap registration queue values to heap 
to avoid
    * complicated off-heap reference counting. Since the registration queue is 
only a
@@ -113,93 +237,54 @@ class ClientRegistrationEventQueueManager {
     }
   }
 
-  void drain(final ClientProxyMembershipID clientProxyMembershipID,
-      final CacheClientNotifier cacheClientNotifier) {
-    ClientRegistrationEventQueue registrationEventQueue =
-        registeringProxyEventQueues.get(clientProxyMembershipID);
-
-    if (registrationEventQueue != null) {
-      // It is possible that several client registration threads are active 
for the same
-      // ClientProxyMembershipID, in which case we only want a single drainer 
to drain
-      // and remove the queue.
-      registrationEventQueue.lockForSingleDrainer();
-      try {
-        // See if the queue is still available after acquiring the lock as it 
may have
-        // been removed from registeringProxyEventQueues by the previous thread
-        if (registeringProxyEventQueues.containsKey(clientProxyMembershipID)) {
-          // As an optimization, we drain as many events from the queue as we 
can
-          // before taking out a lock to drain the remaining events. When we 
lock for draining,
-          // it prevents additional events from being added to the queue while 
the queue is drained
-          // and removed.
-          if (logger.isDebugEnabled()) {
-            logger.debug("Draining events from registration queue for client 
proxy "
-                + clientProxyMembershipID
-                + " without synchronization");
-          }
-
-          drainEventsReceivedWhileRegisteringClient(clientProxyMembershipID, 
registrationEventQueue,
-              cacheClientNotifier);
-
-          // Prevents additional events from being added to the queue while we 
process and remove it
-          registrationEventQueue.lockForDraining();
-          try {
-            if (logger.isDebugEnabled()) {
-              logger.debug("Draining remaining events from registration queue 
for client proxy "
-                  + clientProxyMembershipID + " with synchronization");
-            }
-
-            drainEventsReceivedWhileRegisteringClient(clientProxyMembershipID,
-                registrationEventQueue,
-                cacheClientNotifier);
-
-            registeringProxyEventQueues.remove(clientProxyMembershipID);
-          } finally {
-            registrationEventQueue.unlockForDraining();
-          }
-        }
-      } finally {
-        registrationEventQueue.unlockForSingleDrainer();
-      }
-    }
-  }
-
-  private void drainEventsReceivedWhileRegisteringClient(final 
ClientProxyMembershipID proxyID,
+  private void drainEventsReceivedWhileRegisteringClient(
+      final CacheClientProxy cacheClientProxy,
       final ClientRegistrationEventQueue registrationEventQueue,
       final CacheClientNotifier cacheClientNotifier) {
     ClientRegistrationEvent queuedEvent;
     while ((queuedEvent = registrationEventQueue.poll()) != null) {
       InternalCacheEvent internalCacheEvent = queuedEvent.internalCacheEvent;
       Conflatable conflatable = queuedEvent.conflatable;
-      processEventAndDeliverConflatable(proxyID, cacheClientNotifier, 
internalCacheEvent,
-          conflatable, null);
+      processEventAndDeliverConflatable(cacheClientProxy,
+          cacheClientNotifier, internalCacheEvent, conflatable, null);
     }
   }
 
-  public ClientRegistrationEventQueue create(
-      final ClientProxyMembershipID clientProxyMembershipID,
-      final Queue<ClientRegistrationEvent> eventQueue,
-      final ReadWriteLock eventAddDrainLock,
-      final ReentrantLock singleDrainerLock) {
-    final ClientRegistrationEventQueue clientRegistrationEventQueue =
-        new ClientRegistrationEventQueue(eventQueue,
-            eventAddDrainLock, singleDrainerLock);
-    registeringProxyEventQueues.putIfAbsent(clientProxyMembershipID,
-        clientRegistrationEventQueue);
-    return clientRegistrationEventQueue;
+  /**
+   * Represents a conflatable and event processed while a client was 
registering.
+   * This needs to be queued and processed after registration is complete. The 
conflatable
+   * is what we will actually be delivering to the MessageDispatcher (and 
thereby adding
+   * to the HARegionQueue). The internal cache event is required to rehydrate 
the filter
+   * info and determine if the client which was registering does have a CQ 
that matches or
+   * has registered interest in the key.
+   */
+  private class ClientRegistrationEvent {
+    private final Conflatable conflatable;
+    private final InternalCacheEvent internalCacheEvent;
+
+    ClientRegistrationEvent(final InternalCacheEvent internalCacheEvent,
+        final Conflatable conflatable) {
+      this.conflatable = conflatable;
+      this.internalCacheEvent = internalCacheEvent;
+    }
   }
 
   class ClientRegistrationEventQueue {
+    private final ClientProxyMembershipID clientProxyMembershipID;
     private final Queue<ClientRegistrationEvent> eventQueue;
-    private final ReadWriteLock eventAddDrainLock;
-    private final ReentrantLock singleDrainerLock;
+    private final ReentrantReadWriteLock eventAddDrainLock;
 
     ClientRegistrationEventQueue(
+        final ClientProxyMembershipID clientProxyMembershipID,
         final Queue<ClientRegistrationEvent> eventQueue,
-        final ReadWriteLock eventAddDrainLock,
-        final ReentrantLock singleDrainerLock) {
+        final ReentrantReadWriteLock eventAddDrainLock) {
+      this.clientProxyMembershipID = clientProxyMembershipID;
       this.eventQueue = eventQueue;
       this.eventAddDrainLock = eventAddDrainLock;
-      this.singleDrainerLock = singleDrainerLock;
+    }
+
+    public ClientProxyMembershipID getClientProxyMembershipID() {
+      return clientProxyMembershipID;
     }
 
     boolean isEmpty() {
@@ -222,6 +307,10 @@ class ClientRegistrationEventQueueManager {
       eventAddDrainLock.writeLock().unlock();
     }
 
+    private boolean isLockForDrainingHeld() {
+      return eventAddDrainLock.writeLock().isHeldByCurrentThread();
+    }
+
     private void lockForPutting() {
       eventAddDrainLock.readLock().lock();
     }
@@ -229,103 +318,5 @@ class ClientRegistrationEventQueueManager {
     private void unlockForPutting() {
       eventAddDrainLock.readLock().unlock();
     }
-
-    private void lockForSingleDrainer() {
-      singleDrainerLock.lock();
-    }
-
-    private void unlockForSingleDrainer() {
-      singleDrainerLock.unlock();
-    }
-  }
-
-  private void processEventAndDeliverConflatable(final ClientProxyMembershipID 
proxyID,
-      final CacheClientNotifier cacheClientNotifier,
-      final InternalCacheEvent internalCacheEvent,
-      final Conflatable conflatable,
-      final Set<ClientProxyMembershipID> originalFilterClientIDs) {
-    // The first step is to repopulate the filter info for the event to 
determine if
-    // the client which was registering has a matching CQ or has registered 
interest
-    // in the key for this event. We need to get the filter profile, filter 
routing info,
-    // and local filter info in order to do so. If any of these are null, then 
there is
-    // no need to proceed as the client is not interested.
-    FilterProfile filterProfile =
-        ((LocalRegion) internalCacheEvent.getRegion()).getFilterProfile();
-
-    if (filterProfile != null) {
-      FilterRoutingInfo filterRoutingInfo =
-          filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent);
-
-      if (filterRoutingInfo != null) {
-        FilterRoutingInfo.FilterInfo filterInfo = 
filterRoutingInfo.getLocalFilterInfo();
-
-        if (filterInfo != null) {
-          ClientUpdateMessageImpl clientUpdateMessage = conflatable instanceof 
HAEventWrapper
-              ? (ClientUpdateMessageImpl) ((HAEventWrapper) 
conflatable).getClientUpdateMessage()
-              : (ClientUpdateMessageImpl) conflatable;
-
-          internalCacheEvent.setLocalFilterInfo(filterInfo);
-
-          Set<ClientProxyMembershipID> newFilterClientIDs =
-              cacheClientNotifier.getFilterClientIDs(internalCacheEvent, 
filterProfile,
-                  filterInfo,
-                  clientUpdateMessage);
-
-          if (eventNotInOriginalFilterClientIDs(proxyID, newFilterClientIDs,
-              originalFilterClientIDs)) {
-            CacheClientProxy cacheClientProxy = 
cacheClientNotifier.getClientProxy(proxyID);
-
-            if (eventShouldBeDelivered(proxyID, newFilterClientIDs, 
cacheClientProxy)) {
-              cacheClientProxy.deliverMessage(conflatable);
-            }
-          }
-        }
-      }
-    }
-
-    // Once we have processed the conflatable, if it is an HAEventWrapper we 
can
-    // decrement the PutInProgress counter, allowing the ClientUpdateMessage 
to be
-    // set to null. See decrementPutInProgressCounter() for more details.
-    if (conflatable instanceof HAEventWrapper) {
-      ((HAEventWrapper) conflatable).decrementPutInProgressCounter();
-    }
-  }
-
-  private boolean eventShouldBeDelivered(final ClientProxyMembershipID proxyID,
-      final Set<ClientProxyMembershipID> filterClientIDs,
-      final CacheClientProxy cacheClientProxy) {
-    return filterClientIDs.contains(proxyID) && cacheClientProxy != null;
-  }
-
-  /*
-   * This is to handle the edge case where the original filter client IDs
-   * calculated by "normal" put processing did not include the registering 
client
-   * because the filter info had not been received yet, but we now found that 
the client
-   * is interested in the event so we should deliver it.
-   */
-  private boolean eventNotInOriginalFilterClientIDs(final 
ClientProxyMembershipID proxyID,
-      final Set<ClientProxyMembershipID> newFilterClientIDs,
-      final Set<ClientProxyMembershipID> originalFilterClientIDs) {
-    return originalFilterClientIDs == null
-        || (!originalFilterClientIDs.contains(proxyID) && 
newFilterClientIDs.contains(proxyID));
-  }
-
-  /**
-   * Represents a conflatable and event processed while a client was 
registering.
-   * This needs to be queued and processed after registration is complete. The 
conflatable
-   * is what we will actually be delivering to the MessageDispatcher (and 
thereby adding
-   * to the HARegionQueue). The internal cache event is required to rehydrate 
the filter
-   * info and determine if the client which was registering does have a CQ 
that matches or
-   * has registered interest in the key.
-   */
-  private class ClientRegistrationEvent {
-    private final Conflatable conflatable;
-    private final InternalCacheEvent internalCacheEvent;
-
-    ClientRegistrationEvent(final InternalCacheEvent internalCacheEvent,
-        final Conflatable conflatable) {
-      this.conflatable = conflatable;
-      this.internalCacheEvent = internalCacheEvent;
-    }
   }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index 33ec3aa..731c952 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -171,8 +171,6 @@ public class ClientUpdateMessageImpl implements 
ClientUpdateMessage, Sizeable, N
   public ClientUpdateMessageImpl(EnumListenerEvent operation, LocalRegion 
region,
       Object keyOfInterest, Object value, byte[] delta, byte valueIsObject, 
Object callbackArgument,
       ClientProxyMembershipID memberId, EventID eventIdentifier, VersionTag 
versionTag) {
-    // this._clientInterestList = new HashSet();
-    // this._clientInterestListInv = new HashSet();
     this._operation = operation;
     this._regionName = region.getFullPath();
     this._keyOfInterest = keyOfInterest;
@@ -196,8 +194,6 @@ public class ClientUpdateMessageImpl implements 
ClientUpdateMessage, Sizeable, N
    */
   protected ClientUpdateMessageImpl(EnumListenerEvent operation, 
ClientProxyMembershipID memberId,
       EventID eventIdentifier) {
-    // this._clientInterestList = new HashSet();
-    // this._clientInterestListInv = new HashSet();
     this._operation = operation;
     this._membershipId = memberId;
     this._eventIdentifier = eventIdentifier;
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionTest.java
index 4708071..0b41015 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionTest.java
@@ -49,6 +49,7 @@ import 
org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
 import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import 
org.apache.geode.internal.cache.tier.sockets.ClientRegistrationEventQueueManager;
 import org.apache.geode.internal.cache.tier.sockets.ConnectionListener;
 import org.apache.geode.test.fake.Fakes;
 
@@ -342,8 +343,9 @@ public class BucketRegionTest {
     doReturn(mock(SystemTimer.class)).when(cache).getCCPTimer();
 
     CacheClientNotifier ccn =
-        CacheClientNotifier.getInstance(cache, mock(CacheServerStats.class), 
10,
-            10, mock(ConnectionListener.class), null, true);
+        CacheClientNotifier.getInstance(cache, 
mock(ClientRegistrationEventQueueManager.class),
+            mock(CacheServerStats.class), 10, 10,
+            mock(ConnectionListener.class), null, true);
 
     bucketRegion.notifyClientsOfTombstoneGC(regionGCVersions, keysRemoved, 
eventID, routing);
     verify(bucketRegion, never()).getFilterProfile();
@@ -365,8 +367,8 @@ public class BucketRegionTest {
     doReturn(mock(SystemTimer.class)).when(cache).getCCPTimer();
 
     CacheClientNotifier ccn =
-        CacheClientNotifier.getInstance(cache, mock(CacheServerStats.class), 
10,
-            10, mock(ConnectionListener.class), null, true);
+        CacheClientNotifier.getInstance(cache, 
mock(ClientRegistrationEventQueueManager.class),
+            mock(CacheServerStats.class), 10, 10, 
mock(ConnectionListener.class), null, true);
 
     doReturn(mock(ClientProxyMembershipID.class)).when(proxy).getProxyID();
     ccn.addClientProxyToMap(proxy);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheServerImplTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheServerImplTest.java
index b8d5362..9a99b94 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheServerImplTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheServerImplTest.java
@@ -88,8 +88,9 @@ public class CacheServerImplTest {
   @Test
   public void createdAcceptorIsGatewayEndpoint() throws IOException {
     OverflowAttributes overflowAttributes = mock(OverflowAttributes.class);
-    InternalCacheServer server = new CacheServerImpl(cache, securityService, 
new AcceptorBuilder(),
-        true, true, () -> socketCreator, (a, b, c, d, e, f, g) -> 
cacheClientNotifier,
+    InternalCacheServer server = new CacheServerImpl(cache, securityService,
+        new AcceptorBuilder(), true, true,
+        () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
         (a, b, c) -> clientHealthMonitor, a -> advisor);
 
     Acceptor acceptor = server.createAcceptor(overflowAttributes);
@@ -99,8 +100,9 @@ public class CacheServerImplTest {
 
   @Test
   public void getGroups_returnsSpecifiedGroup() {
-    InternalCacheServer server = new CacheServerImpl(cache, securityService, 
new AcceptorBuilder(),
-        true, true, () -> socketCreator, (a, b, c, d, e, f, g) -> 
cacheClientNotifier,
+    InternalCacheServer server = new CacheServerImpl(cache, securityService,
+        new AcceptorBuilder(), true, true,
+        () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
         (a, b, c) -> clientHealthMonitor, a -> advisor);
     String specifiedGroup = "group0";
 
@@ -112,8 +114,9 @@ public class CacheServerImplTest {
 
   @Test
   public void getGroups_returnsMultipleSpecifiedGroups() {
-    InternalCacheServer server = new CacheServerImpl(cache, securityService, 
new AcceptorBuilder(),
-        true, true, () -> socketCreator, (a, b, c, d, e, f, g) -> 
cacheClientNotifier,
+    InternalCacheServer server = new CacheServerImpl(cache, securityService,
+        new AcceptorBuilder(), true, true,
+        () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
         (a, b, c) -> clientHealthMonitor, a -> advisor);
     String specifiedGroup1 = "group1";
     String specifiedGroup2 = "group2";
@@ -129,8 +132,9 @@ public class CacheServerImplTest {
   public void getCombinedGroups_includesMembershipGroup() {
     String membershipGroup = "group-m0";
     when(config.getGroups()).thenReturn(membershipGroup);
-    InternalCacheServer server = new CacheServerImpl(cache, securityService, 
new AcceptorBuilder(),
-        true, true, () -> socketCreator, (a, b, c, d, e, f, g) -> 
cacheClientNotifier,
+    InternalCacheServer server = new CacheServerImpl(cache, securityService,
+        new AcceptorBuilder(), true, true,
+        () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
         (a, b, c) -> clientHealthMonitor, a -> advisor);
 
     assertThat(server.getCombinedGroups())
@@ -144,8 +148,9 @@ public class CacheServerImplTest {
     String membershipGroup3 = "group-m3";
     when(config.getGroups())
         .thenReturn(membershipGroup1 + "," + membershipGroup2 + "," + 
membershipGroup3);
-    InternalCacheServer server = new CacheServerImpl(cache, securityService, 
new AcceptorBuilder(),
-        true, true, () -> socketCreator, (a, b, c, d, e, f, g) -> 
cacheClientNotifier,
+    InternalCacheServer server = new CacheServerImpl(cache, securityService,
+        new AcceptorBuilder(), true, true,
+        () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
         (a, b, c) -> clientHealthMonitor, a -> advisor);
 
     assertThat(server.getCombinedGroups())
@@ -159,8 +164,9 @@ public class CacheServerImplTest {
     String membershipGroup3 = "group-m3";
     when(config.getGroups())
         .thenReturn(membershipGroup1 + "," + membershipGroup2 + "," + 
membershipGroup3);
-    InternalCacheServer server = new CacheServerImpl(cache, securityService, 
new AcceptorBuilder(),
-        true, true, () -> socketCreator, (a, b, c, d, e, f, g) -> 
cacheClientNotifier,
+    InternalCacheServer server = new CacheServerImpl(cache, securityService,
+        new AcceptorBuilder(), true, true,
+        () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
         (a, b, c) -> clientHealthMonitor, a -> advisor);
     String specifiedGroup1 = "group1";
     String specifiedGroup2 = "group2";
@@ -175,8 +181,9 @@ public class CacheServerImplTest {
 
   @Test
   public void startNotifiesResourceEventCacheServerStart() throws IOException {
-    InternalCacheServer server = new CacheServerImpl(cache, securityService, 
new AcceptorBuilder(),
-        true, true, () -> socketCreator, (a, b, c, d, e, f, g) -> 
cacheClientNotifier,
+    InternalCacheServer server = new CacheServerImpl(cache, securityService,
+        new AcceptorBuilder(), true, true,
+        () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
         (a, b, c) -> clientHealthMonitor, a -> advisor);
 
     server.start();
@@ -186,8 +193,9 @@ public class CacheServerImplTest {
 
   @Test
   public void stopNotifiesResourceEventCacheServerStart() throws IOException {
-    InternalCacheServer server = new CacheServerImpl(cache, securityService, 
new AcceptorBuilder(),
-        true, true, () -> socketCreator, (a, b, c, d, e, f, g) -> 
cacheClientNotifier,
+    InternalCacheServer server = new CacheServerImpl(cache, securityService,
+        new AcceptorBuilder(), true, true,
+        () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
         (a, b, c) -> clientHealthMonitor, a -> advisor);
     server.start();
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionPartialMockTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionPartialMockTest.java
index c345d6b..8d4e38c 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionPartialMockTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionPartialMockTest.java
@@ -51,6 +51,7 @@ import 
org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
 import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import 
org.apache.geode.internal.cache.tier.sockets.ClientRegistrationEventQueueManager;
 import org.apache.geode.internal.cache.tier.sockets.ConnectionListener;
 
 public class LocalRegionPartialMockTest {
@@ -226,8 +227,9 @@ public class LocalRegionPartialMockTest {
     when(cache.getCCPTimer()).thenReturn(mock(SystemTimer.class));
 
     CacheClientNotifier ccn =
-        CacheClientNotifier.getInstance(cache, mock(CacheServerStats.class), 
10,
-            10, mock(ConnectionListener.class), null, true);
+        CacheClientNotifier.getInstance(cache, 
mock(ClientRegistrationEventQueueManager.class),
+            mock(CacheServerStats.class), 10, 10,
+            mock(ConnectionListener.class), null, true);
 
     
doCallRealMethod().when(region).notifyClientsOfTombstoneGC(regionGCVersions, 
keysRemoved,
         eventID, routing);
@@ -248,8 +250,9 @@ public class LocalRegionPartialMockTest {
     when(cache.getCCPTimer()).thenReturn(mock(SystemTimer.class));
 
     CacheClientNotifier ccn =
-        CacheClientNotifier.getInstance(cache, mock(CacheServerStats.class), 
10,
-            10, mock(ConnectionListener.class), null, true);
+        CacheClientNotifier.getInstance(cache, 
mock(ClientRegistrationEventQueueManager.class),
+            mock(CacheServerStats.class), 10, 10,
+            mock(ConnectionListener.class), null, true);
 
     when(proxy.getProxyID()).thenReturn(mock(ClientProxyMembershipID.class));
     ccn.addClientProxyToMap(proxy);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
index a67b2d6..b01103f 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache.tier.sockets;
 
+import static java.util.Collections.emptyList;
 import static 
org.apache.geode.cache.server.CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS;
 import static 
org.apache.geode.cache.server.CacheServer.DEFAULT_SOCKET_BUFFER_SIZE;
 import static org.apache.geode.cache.server.CacheServer.DEFAULT_TCP_NO_DELAY;
@@ -29,7 +30,6 @@ import static org.mockito.quality.Strictness.STRICT_STUBS;
 
 import java.net.ServerSocket;
 import java.net.SocketAddress;
-import java.util.Collections;
 import java.util.Properties;
 
 import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
@@ -96,8 +96,8 @@ public class AcceptorImplTest {
         DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache, MINIMUM_MAX_CONNECTIONS, 0,
         CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, 
CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null,
         null, DEFAULT_TCP_NO_DELAY, serverConnectionFactory, 1000, 
securityService,
-        () -> socketCreator, (a, b, c, d, e, f, g) -> cacheClientNotifier,
-        (a, b, c) -> clientHealthMonitor, false, Collections.emptyList());
+        () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
+        (a, b, c) -> clientHealthMonitor, false, emptyList());
 
     assertThat(acceptor.isGatewayReceiver()).isFalse();
   }
@@ -114,8 +114,8 @@ public class AcceptorImplTest {
         DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache, MINIMUM_MAX_CONNECTIONS, 0,
         CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, 
CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null,
         null, DEFAULT_TCP_NO_DELAY, serverConnectionFactory, 1000, 
securityService,
-        () -> socketCreator, (a, b, c, d, e, f, g) -> cacheClientNotifier,
-        (a, b, c) -> clientHealthMonitor, true, Collections.emptyList());
+        () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
+        (a, b, c) -> clientHealthMonitor, true, emptyList());
 
     assertThat(acceptor.isGatewayReceiver()).isTrue();
   }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
index 31f059b..d48ab37 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
@@ -12,11 +12,13 @@
  * or implied. See the License for the specific language governing permissions 
and limitations under
  * the License.
  */
-
 package org.apache.geode.internal.cache.tier.sockets;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -48,9 +50,11 @@ import org.apache.geode.internal.cache.FilterProfile;
 import org.apache.geode.internal.cache.FilterRoutingInfo;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalCacheEvent;
+import org.apache.geode.internal.cache.RegionQueueException;
 import org.apache.geode.test.fake.Fakes;
 
 public class CacheClientNotifierTest {
+
   @Test
   public void 
eventsInClientRegistrationQueueAreSentToClientAfterRegistrationIsComplete()
       throws IOException, ClassNotFoundException, NoSuchMethodException, 
IllegalAccessException,
@@ -63,11 +67,13 @@ public class CacheClientNotifierTest {
     CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
     ClientUpdateMessageImpl clientUpdateMessage = 
mock(ClientUpdateMessageImpl.class);
     ClientRegistrationMetadata clientRegistrationMetadata = 
mock(ClientRegistrationMetadata.class);
+
     when(clientRegistrationMetadata.getClientProxyMembershipID()).thenReturn(
         clientProxyMembershipID);
 
     CacheClientNotifier cacheClientNotifier = 
CacheClientNotifier.getInstance(internalCache,
-        cacheServerStats, 0, 0, connectionListener, null, false);
+        new ClientRegistrationEventQueueManager(), cacheServerStats, 0, 0,
+        connectionListener, null, false);
     final CacheClientNotifier cacheClientNotifierSpy = 
spy(cacheClientNotifier);
 
     CountDownLatch waitForEventDispatchCountdownLatch = new CountDownLatch(1);
@@ -137,6 +143,48 @@ public class CacheClientNotifierTest {
     verify(cacheClientProxy, 
times(1)).deliverMessage(isA(HAEventWrapper.class));
   }
 
+  @Test
+  public void clientRegistrationFailsQueueStillDrained()
+      throws ClassNotFoundException, NoSuchMethodException, 
InvocationTargetException,
+      IllegalAccessException, IOException {
+    InternalCache internalCache = Fakes.cache();
+    CacheServerStats cacheServerStats = mock(CacheServerStats.class);
+    Socket socket = mock(Socket.class);
+    ConnectionListener connectionListener = mock(ConnectionListener.class);
+    ClientRegistrationMetadata clientRegistrationMetadata = 
mock(ClientRegistrationMetadata.class);
+    ClientProxyMembershipID clientProxyMembershipID = 
mock(ClientProxyMembershipID.class);
+    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
+        mock(ClientRegistrationEventQueueManager.class);
+    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue 
clientRegistrationEventQueue =
+        
mock(ClientRegistrationEventQueueManager.ClientRegistrationEventQueue.class);
+
+    when(clientRegistrationMetadata.getClientProxyMembershipID()).thenReturn(
+        clientProxyMembershipID);
+    
when(clientRegistrationEventQueueManager.create(eq(clientProxyMembershipID), 
any(), any()))
+        .thenReturn(clientRegistrationEventQueue);
+
+    CacheClientNotifier cacheClientNotifier = 
CacheClientNotifier.getInstance(internalCache,
+        clientRegistrationEventQueueManager, cacheServerStats, 0, 0,
+        connectionListener, null, false);
+    CacheClientNotifier cacheClientNotifierSpy = spy(cacheClientNotifier);
+
+    doAnswer((i) -> {
+      throw new RegionQueueException();
+    
}).when(cacheClientNotifierSpy).registerClientInternal(clientRegistrationMetadata,
 socket,
+        false, 0, true);
+
+    assertThatThrownBy(() -> 
cacheClientNotifierSpy.registerClient(clientRegistrationMetadata,
+        socket, false, 0, true))
+            .isInstanceOf(IOException.class);
+
+    verify(clientRegistrationEventQueueManager, times(1)).create(
+        eq(clientProxyMembershipID), any(), any());
+
+    verify(clientRegistrationEventQueueManager, times(1)).drain(
+        eq(clientRegistrationEventQueue),
+        eq(cacheClientNotifierSpy));
+  }
+
   private InternalCacheEvent createMockInternalCacheEvent(
       final ClientProxyMembershipID clientProxyMembershipID,
       final ClientUpdateMessageImpl clientUpdateMessage,
@@ -188,8 +236,10 @@ public class CacheClientNotifierTest {
     InternalCache internalCache = Fakes.cache();
 
     CacheClientNotifier ccn =
-        CacheClientNotifier.getInstance(internalCache, 
mock(CacheServerStats.class), 10,
-            10, mock(ConnectionListener.class), null, true);
+        CacheClientNotifier.getInstance(internalCache,
+            mock(ClientRegistrationEventQueueManager.class),
+            mock(CacheServerStats.class), 10, 10,
+            mock(ConnectionListener.class), null, true);
 
     assertFalse(CacheClientNotifier.singletonHasClientProxies());
     ccn.shutdown(111);
@@ -202,8 +252,10 @@ public class CacheClientNotifierTest {
     CacheClientProxy proxy = mock(CacheClientProxy.class);
 
     CacheClientNotifier ccn =
-        CacheClientNotifier.getInstance(internalCache, 
mock(CacheServerStats.class), 10,
-            10, mock(ConnectionListener.class), null, true);
+        CacheClientNotifier.getInstance(internalCache,
+            mock(ClientRegistrationEventQueueManager.class),
+            mock(CacheServerStats.class), 10, 10,
+            mock(ConnectionListener.class), null, true);
 
     when(proxy.getProxyID()).thenReturn(mock(ClientProxyMembershipID.class));
     ccn.addClientProxy(proxy);
@@ -221,8 +273,10 @@ public class CacheClientNotifierTest {
     CacheClientProxy proxy = mock(CacheClientProxy.class);
 
     CacheClientNotifier ccn =
-        CacheClientNotifier.getInstance(internalCache, 
mock(CacheServerStats.class), 10,
-            10, mock(ConnectionListener.class), null, true);
+        CacheClientNotifier.getInstance(internalCache,
+            mock(ClientRegistrationEventQueueManager.class),
+            mock(CacheServerStats.class), 10, 10,
+            mock(ConnectionListener.class), null, true);
 
     when(proxy.getProxyID()).thenReturn(mock(ClientProxyMembershipID.class));
     ccn.addClientInitProxy(proxy);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
index f392e7b..16a4042 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
@@ -16,20 +16,21 @@
 package org.apache.geode.internal.cache.tier.sockets;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.junit.Test;
@@ -44,35 +45,15 @@ import org.apache.geode.internal.cache.LocalRegion;
 
 public class ClientRegistrationEventQueueManagerTest {
   @Test
-  public void 
messageDeliveredAfterRegisteringOnDrainIfNewFilterIDsIncludesClient()
-      throws ExecutionException, InterruptedException {
+  public void 
messageDeliveredAfterRegisteringOnDrainIfNewFilterIDsIncludesClient() {
     ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
         new ClientRegistrationEventQueueManager();
 
     ClientProxyMembershipID clientProxyMembershipID = 
mock(ClientProxyMembershipID.class);
 
-    ReadWriteLock mockPutDrainLock = mock(ReadWriteLock.class);
-    ReadWriteLock actualPutDrainLock = new ReentrantReadWriteLock();
-
-    CountDownLatch waitForDrainAndRemoveLatch = new CountDownLatch(1);
-    CountDownLatch waitForAddInProgressLatch = new CountDownLatch(1);
-
-    when(mockPutDrainLock.readLock())
-        .thenAnswer(i -> {
-          waitForAddInProgressLatch.countDown();
-          waitForDrainAndRemoveLatch.await();
-          return actualPutDrainLock.readLock();
-        });
-
-    when(mockPutDrainLock.writeLock())
-        .thenAnswer(i -> {
-          waitForAddInProgressLatch.await();
-          waitForDrainAndRemoveLatch.countDown();
-          return actualPutDrainLock.writeLock();
-        });
-
-    clientRegistrationEventQueueManager.create(clientProxyMembershipID,
-        new ConcurrentLinkedQueue<>(), mockPutDrainLock, new ReentrantLock());
+    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue 
clientRegistrationEventQueue =
+        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+            new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
 
     InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
     LocalRegion localRegion = mock(LocalRegion.class);
@@ -97,24 +78,18 @@ public class ClientRegistrationEventQueueManagerTest {
         clientUpdateMessage))
             .thenReturn(recalculatedFilterClientIDs);
     CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
+    when(cacheClientProxy.getProxyID()).thenReturn(clientProxyMembershipID);
     
when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
 
     // Create empty filter client IDs produced by the "normal" put processing 
path, so we can test
     // that the event is still delivered if the client finished registering 
and needs the event.
     Set<ClientProxyMembershipID> normalPutFilterClientIDs = new HashSet<>();
-    CompletableFuture<Void> addEventsToQueueTask = 
CompletableFuture.runAsync(() -> {
-      // In thread one, we add and event to the queue
-      clientRegistrationEventQueueManager
-          .add(internalCacheEvent, clientUpdateMessage, 
normalPutFilterClientIDs,
-              cacheClientNotifier);
-    });
 
-    CompletableFuture<Void> drainEventsFromQueueTask = 
CompletableFuture.runAsync(() -> {
-      // In thread two, we drain the event from the queue
-      clientRegistrationEventQueueManager.drain(clientProxyMembershipID, 
cacheClientNotifier);
-    });
+    clientRegistrationEventQueueManager
+        .add(internalCacheEvent, clientUpdateMessage, normalPutFilterClientIDs,
+            cacheClientNotifier);
 
-    CompletableFuture.allOf(addEventsToQueueTask, 
drainEventsFromQueueTask).get();
+    clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, 
cacheClientNotifier);
 
     // The client update message should still be delivered because it is now 
part of the
     // filter clients interested in this event, despite having not been 
included in the original
@@ -130,7 +105,7 @@ public class ClientRegistrationEventQueueManagerTest {
     ClientProxyMembershipID clientProxyMembershipID = 
mock(ClientProxyMembershipID.class);
 
     clientRegistrationEventQueueManager.create(clientProxyMembershipID,
-        new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock(), new 
ReentrantLock());
+        new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
 
     InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
     when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
@@ -156,13 +131,43 @@ public class ClientRegistrationEventQueueManagerTest {
   }
 
   @Test
-  public void addAndDrainQueueContentionTest() throws ExecutionException, 
InterruptedException {
+  public void 
putInProgressCounterIncrementedOnAddAndDecrementedOnRemoveForAllEvents() {
     ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
         new ClientRegistrationEventQueueManager();
 
     ClientProxyMembershipID clientProxyMembershipID = 
mock(ClientProxyMembershipID.class);
-    ReadWriteLock mockPutDrainLock = mock(ReadWriteLock.class);
-    ReadWriteLock actualPutDrainLock = new ReentrantReadWriteLock();
+
+    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue 
clientRegistrationEventQueue =
+        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+            new ConcurrentLinkedQueue<>(),
+            new ReentrantReadWriteLock());
+
+    List<HAEventWrapper> haEventWrappers = new ArrayList<>();
+    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
+
+    for (int i = 0; i < 5; ++i) {
+      HAEventWrapper haEventWrapper = mock(HAEventWrapper.class);
+      haEventWrappers.add(haEventWrapper);
+      InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
+      when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
+      
when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
+      clientRegistrationEventQueueManager.add(internalCacheEvent,
+          haEventWrapper, new HashSet<>(), cacheClientNotifier);
+      verify(haEventWrapper, 
times(1)).incrementPutInProgressCounter(anyString());
+    }
+
+    clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, 
cacheClientNotifier);
+
+    for (HAEventWrapper haEventWrapper : haEventWrappers) {
+      verify(haEventWrapper, times(1)).decrementPutInProgressCounter();
+    }
+  }
+
+  @Test
+  public void addAndDrainQueueContentionTest() throws ExecutionException, 
InterruptedException {
+    ClientProxyMembershipID clientProxyMembershipID = 
mock(ClientProxyMembershipID.class);
+    ReentrantReadWriteLock mockPutDrainLock = 
mock(ReentrantReadWriteLock.class);
+    ReentrantReadWriteLock actualPutDrainLock = new ReentrantReadWriteLock();
 
     when(mockPutDrainLock.readLock())
         .thenReturn(actualPutDrainLock.readLock());
@@ -174,9 +179,12 @@ public class ClientRegistrationEventQueueManagerTest {
           return actualPutDrainLock.writeLock();
         });
 
+    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
+        new ClientRegistrationEventQueueManager();
+
     ClientRegistrationEventQueueManager.ClientRegistrationEventQueue 
clientRegistrationEventQueue =
         clientRegistrationEventQueueManager.create(clientProxyMembershipID,
-            new ConcurrentLinkedQueue<>(), mockPutDrainLock, new 
ReentrantLock());
+            new ConcurrentLinkedQueue<>(), mockPutDrainLock);
 
     InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
     when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
@@ -185,6 +193,8 @@ public class ClientRegistrationEventQueueManagerTest {
     Conflatable conflatable = mock(Conflatable.class);
     Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
     CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
+    CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
+    
when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
 
     CompletableFuture<Void> addEventsToQueueTask = 
CompletableFuture.runAsync(() -> {
       for (int numAdds = 0; numAdds < 100000; ++numAdds) {
@@ -196,7 +206,7 @@ public class ClientRegistrationEventQueueManagerTest {
 
     CompletableFuture<Void> drainEventsFromQueueTask = 
CompletableFuture.runAsync(() -> {
       // In thread two, we drain events from the queue
-      clientRegistrationEventQueueManager.drain(clientProxyMembershipID, 
cacheClientNotifier);
+      clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, 
cacheClientNotifier);
     });
 
     CompletableFuture.allOf(addEventsToQueueTask, 
drainEventsFromQueueTask).get();
@@ -205,77 +215,154 @@ public class ClientRegistrationEventQueueManagerTest {
   }
 
   @Test
-  public void twoThreadsRegisteringSameClientNoEventsLost()
-      throws ExecutionException, InterruptedException {
-    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
-        new ClientRegistrationEventQueueManager();
-
-    InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
+  public void addEventWithOffheapValueCopiedToHeap() {
+    EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
     when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
-    when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
+    Operation mockOperation = mock(Operation.class);
+    when(mockOperation.isEntry()).thenReturn(true);
+    when(internalCacheEvent.getOperation()).thenReturn(mockOperation);
 
     Conflatable conflatable = mock(Conflatable.class);
     Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
     CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
     ClientProxyMembershipID clientProxyMembershipID = 
mock(ClientProxyMembershipID.class);
 
+    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
+        new ClientRegistrationEventQueueManager();
+
+    clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+        new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
+
+    clientRegistrationEventQueueManager
+        .add(internalCacheEvent, conflatable, filterClientIDs, 
cacheClientNotifier);
+
+    verify(internalCacheEvent, times(1)).copyOffHeapToHeap();
+  }
+
+  @Test
+  public void clientWasNeverRegisteredDrainQueueStillRemoved() {
+    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
+        new ClientRegistrationEventQueueManager();
+
+    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
+    ClientProxyMembershipID clientProxyMembershipID = 
mock(ClientProxyMembershipID.class);
+
     ClientRegistrationEventQueueManager.ClientRegistrationEventQueue 
clientRegistrationEventQueue =
         clientRegistrationEventQueueManager.create(clientProxyMembershipID,
-            new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock(), new 
ReentrantLock());
-
-    for (int registrationIterations = 0; registrationIterations < 1000; 
++registrationIterations) {
-      Runnable clientRegistrationSimulation = () -> {
-        for (int numAdds = 0; numAdds < getRandomNumberOfAdds(); ++numAdds) {
-          // In thread one, we add events to the queue
-          clientRegistrationEventQueueManager
-              .add(internalCacheEvent, conflatable, filterClientIDs, 
cacheClientNotifier);
-        }
-        // In thread two, we drain events from the queue
-        clientRegistrationEventQueueManager.drain(clientProxyMembershipID, 
cacheClientNotifier);
-      };
-
-      CompletableFuture<Void> registrationFutureOne =
-          CompletableFuture.runAsync(clientRegistrationSimulation);
-      CompletableFuture<Void> registrationFutureTwo =
-          CompletableFuture.runAsync(clientRegistrationSimulation);
-
-      CompletableFuture.allOf(registrationFutureOne, 
registrationFutureTwo).get();
-
-      assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
-    }
+            new ConcurrentLinkedQueue<>(),
+            new ReentrantReadWriteLock());
+
+    clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, 
cacheClientNotifier);
+
+    EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
+    Conflatable conflatable = mock(Conflatable.class);
+    Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
+
+    // Pass a new event to the ClientRegistrationEventQueueManager. This event 
should not be added
+    // to the test client's registration queue, because it should already be 
removed. We can
+    // validate that by asserting that the client's registration queue is 
empty after the add.
+    clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, 
filterClientIDs,
+        cacheClientNotifier);
+
+    assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
   }
 
   @Test
-  public void addEventWithOffheapValueCopiedToHeap() {
+  public void drainThrowsExceptionQueueStillRemoved() {
+    CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
+    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
+    ClientProxyMembershipID clientProxyMembershipID = 
mock(ClientProxyMembershipID.class);
+    
when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
+
     ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
         new ClientRegistrationEventQueueManager();
 
+    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue 
clientRegistrationEventQueue =
+        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+            new ConcurrentLinkedQueue<>(),
+            new ReentrantReadWriteLock());
+
+    Conflatable conflatable = mock(Conflatable.class);
+    Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
+
     EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
-    when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
+    RuntimeException testException = new RuntimeException();
+    when(internalCacheEvent.getRegion()).thenThrow(testException);
     Operation mockOperation = mock(Operation.class);
     when(mockOperation.isEntry()).thenReturn(true);
     when(internalCacheEvent.getOperation()).thenReturn(mockOperation);
 
-    Conflatable conflatable = mock(Conflatable.class);
-    Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
-    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
-    ClientProxyMembershipID clientProxyMembershipID = 
mock(ClientProxyMembershipID.class);
+    clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, 
filterClientIDs,
+        cacheClientNotifier);
 
-    clientRegistrationEventQueueManager.create(clientProxyMembershipID,
-        new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock(), new 
ReentrantLock());
+    assertThatThrownBy(() -> 
clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue,
+        cacheClientNotifier))
+            .isEqualTo(testException);
 
-    clientRegistrationEventQueueManager
-        .add(internalCacheEvent, conflatable, filterClientIDs, 
cacheClientNotifier);
+    // Pass a new event to the ClientRegistrationEventQueueManager. This event 
should not be added
+    // to the test client's registration queue, because it should already be 
removed. We can
+    // validate that by asserting that the client's registration queue is 
empty after the add.
+    clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, 
filterClientIDs,
+        cacheClientNotifier);
 
-    verify(internalCacheEvent, times(1)).copyOffHeapToHeap();
+    assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
   }
 
-  /*
-   * This helps to create contention between registration threads during the 
drain phase
-   */
-  private static int getRandomNumberOfAdds() {
-    int min = 10_000;
-    int max = 50_000;
-    return ThreadLocalRandom.current().nextInt(min, max + 1);
+  @Test
+  public void 
addEventInOriginalFilterIDsButQueueWasRemovedDueToSuccessfulRegistrationSoEventNotRedelivered()
 {
+    ClientProxyMembershipID clientProxyMembershipID = 
mock(ClientProxyMembershipID.class);
+    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
+    CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
+    
when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
+    Set<ClientProxyMembershipID> originalFilterIDs = new HashSet<>();
+    originalFilterIDs.add(clientProxyMembershipID);
+
+    ClientUpdateMessageImpl clientUpdateMessage = 
mock(ClientUpdateMessageImpl.class);
+
+    InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
+    LocalRegion localRegion = mock(LocalRegion.class);
+    FilterProfile filterProfile = mock(FilterProfile.class);
+    FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class);
+    FilterRoutingInfo.FilterInfo filterInfo = 
mock(FilterRoutingInfo.FilterInfo.class);
+
+    when(filterRoutingInfo.getLocalFilterInfo()).thenReturn(
+        filterInfo);
+    when(filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent))
+        .thenReturn(filterRoutingInfo);
+    when(localRegion.getFilterProfile()).thenReturn(filterProfile);
+    when(internalCacheEvent.getRegion()).thenReturn(localRegion);
+    when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
+
+    Set<ClientProxyMembershipID> recalculatedFilterClientIDs = new HashSet<>();
+    recalculatedFilterClientIDs.add(clientProxyMembershipID);
+    when(cacheClientNotifier.getFilterClientIDs(internalCacheEvent, 
filterProfile, filterInfo,
+        clientUpdateMessage))
+            .thenReturn(recalculatedFilterClientIDs);
+    when(cacheClientProxy.getProxyID()).thenReturn(clientProxyMembershipID);
+    
when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
+    ReentrantReadWriteLock mockReadWriteLock = 
mock(ReentrantReadWriteLock.class);
+
+    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
+        new ClientRegistrationEventQueueManager();
+
+    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue 
clientRegistrationEventQueue =
+        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+            new ConcurrentLinkedQueue<>(),
+            mockReadWriteLock);
+
+    ReentrantReadWriteLock.ReadLock mockReadLock = 
mock(ReentrantReadWriteLock.ReadLock.class);
+    when(mockReadWriteLock.readLock()).thenReturn(mockReadLock);
+    ReentrantReadWriteLock actualPutDrainLock = new ReentrantReadWriteLock();
+    
when(mockReadWriteLock.writeLock()).thenReturn(actualPutDrainLock.writeLock());
+    doAnswer(i -> {
+      clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, 
cacheClientNotifier);
+      actualPutDrainLock.readLock();
+      return null;
+    }).when(mockReadLock).lock();
+
+    clientRegistrationEventQueueManager.add(internalCacheEvent, 
clientUpdateMessage,
+        originalFilterIDs, cacheClientNotifier);
+
+    verify(cacheClientProxy, times(0)).deliverMessage(clientUpdateMessage);
   }
 }

Reply via email to