This is an automated email from the ASF dual-hosted git repository.
onichols pushed a commit to branch release/1.10.0
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/release/1.10.0 by this push:
new 920723c GEODE-7089: Each client registration thread uses its own
queue (#3976)
920723c is described below
commit 920723c5b56b9c00ccb2366adb5e4ba0f0f06927
Author: Ryan McMahon <[email protected]>
AuthorDate: Tue Aug 27 09:29:53 2019 -0700
GEODE-7089: Each client registration thread uses its own queue (#3976)
Co-authored-by: Ryan McMahon <[email protected]>
Co-authored-by: Donal Evans <[email protected]>
(cherry picked from commit 5d0153ad4adb1612a1083673f98b1982819a6589)
---
.../cache/ha/HARegionQueueIntegrationTest.java | 44 +--
.../internal/cache/tier/sockets/AcceptorImpl.java | 11 +-
.../cache/tier/sockets/CacheClientNotifier.java | 61 ++--
.../ClientRegistrationEventQueueManager.java | 357 ++++++++++-----------
.../tier/sockets/ClientUpdateMessageImpl.java | 4 -
.../geode/internal/cache/BucketRegionTest.java | 10 +-
.../geode/internal/cache/CacheServerImplTest.java | 40 ++-
.../internal/cache/LocalRegionPartialMockTest.java | 11 +-
.../cache/tier/sockets/AcceptorImplTest.java | 10 +-
.../tier/sockets/CacheClientNotifierTest.java | 70 +++-
.../ClientRegistrationEventQueueManagerTest.java | 271 ++++++++++------
11 files changed, 498 insertions(+), 391 deletions(-)
diff --git
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
index 6592dce..5e19f3f 100644
---
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
+++
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
@@ -20,7 +20,6 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -50,8 +49,6 @@ import org.mockito.MockitoAnnotations;
import util.TestException;
import org.apache.geode.CancelCriterion;
-import org.apache.geode.Statistics;
-import org.apache.geode.StatisticsType;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
@@ -61,12 +58,7 @@ import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.Scope;
-import org.apache.geode.distributed.internal.DSClock;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.CachedDeserializable;
@@ -81,6 +73,7 @@ import org.apache.geode.internal.cache.VMCachedDeserializable;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import
org.apache.geode.internal.cache.tier.sockets.ClientRegistrationEventQueueManager;
import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessage;
import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl;
import org.apache.geode.internal.cache.tier.sockets.ConnectionListener;
@@ -108,7 +101,6 @@ public class HARegionQueueIntegrationTest {
dataRegion = createDataRegion();
ccn = createCacheClientNotifier();
member = createMember();
- HAContainerWrapper haContainerWrapper = (HAContainerWrapper)
ccn.getHaContainer();
}
@After
@@ -125,38 +117,12 @@ public class HARegionQueueIntegrationTest {
return cache.createRegionFactory(RegionShortcut.REPLICATE).create("data");
}
- private InternalCache createMockInternalCache() {
- InternalCache mockInternalCache = mock(InternalCache.class);
- doReturn(mock(SystemTimer.class)).when(mockInternalCache).getCCPTimer();
-
doReturn(mock(CancelCriterion.class)).when(mockInternalCache).getCancelCriterion();
-
- InternalDistributedSystem mockInteralDistributedSystem =
createMockInternalDistributedSystem();
-
doReturn(mockInteralDistributedSystem).when(mockInternalCache).getInternalDistributedSystem();
-
doReturn(mockInteralDistributedSystem).when(mockInternalCache).getDistributedSystem();
-
- return mockInternalCache;
- }
-
- private InternalDistributedSystem createMockInternalDistributedSystem() {
- InternalDistributedSystem mockInternalDistributedSystem =
- mock(InternalDistributedSystem.class);
- DistributionManager mockDistributionManager =
mock(DistributionManager.class);
-
-
doReturn(mock(InternalDistributedMember.class)).when(mockInternalDistributedSystem)
- .getDistributedMember();
- doReturn(mock(Statistics.class)).when(mockInternalDistributedSystem)
- .createAtomicStatistics(any(StatisticsType.class), any(String.class));
-
doReturn(mock(DistributionConfig.class)).when(mockDistributionManager).getConfig();
-
doReturn(mockDistributionManager).when(mockInternalDistributedSystem).getDistributionManager();
-
doReturn(mock(DSClock.class)).when(mockInternalDistributedSystem).getClock();
-
- return mockInternalDistributedSystem;
- }
-
private CacheClientNotifier createCacheClientNotifier() {
CacheClientNotifier ccn =
- CacheClientNotifier.getInstance((InternalCache) cache,
mock(CacheServerStats.class),
- 100000, 100000, mock(ConnectionListener.class), null, false);
+ CacheClientNotifier.getInstance((InternalCache) cache,
+ mock(ClientRegistrationEventQueueManager.class),
mock(CacheServerStats.class),
+ 100000, 100000, mock(ConnectionListener.class),
+ null, false);
return ccn;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 8fdd78a..ed34572 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -400,15 +400,14 @@ public class AcceptorImpl implements Acceptor, Runnable {
* @param maxConnections the maximum number of connections allowed in the
server pool
* @param maxThreads the maximum number of threads allowed in the server pool
* @param securityService the SecurityService to use for authentication and
authorization
- * @param gatewayReceiver the GatewayReceiver that will use this
AcceptorImpl instance
- * @param gatewayReceiverMetrics the GatewayReceiverMetrics to use for
exposing metrics
* @param gatewayTransportFilters List of GatewayTransportFilters
*/
AcceptorImpl(final int port, final String bindHostName, final boolean
notifyBySubscription,
final int socketBufferSize, final int maximumTimeBetweenPings,
final InternalCache internalCache, final int maxConnections, final int
maxThreads,
final int maximumMessageCount, final int messageTimeToLive,
- final ConnectionListener connectionListener, final OverflowAttributes
overflowAttributes,
+ final ConnectionListener connectionListener,
+ final OverflowAttributes overflowAttributes,
final boolean tcpNoDelay, final ServerConnectionFactory
serverConnectionFactory,
final long timeLimitMillis, final SecurityService securityService,
final Supplier<SocketCreator> socketCreatorSupplier,
@@ -609,8 +608,10 @@ public class AcceptorImpl implements Acceptor, Runnable {
cache = internalCache;
crHelper = new CachedRegionHelper(cache);
- clientNotifier = cacheClientNotifierProvider.get(internalCache, stats,
maximumMessageCount,
- messageTimeToLive, this.connectionListener, overflowAttributes,
isGatewayReceiver());
+ clientNotifier =
+ cacheClientNotifierProvider.get(internalCache, new
ClientRegistrationEventQueueManager(),
+ stats, maximumMessageCount, messageTimeToLive,
this.connectionListener,
+ overflowAttributes, isGatewayReceiver());
this.socketBufferSize = socketBufferSize;
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index 090d2b2..3c8134f 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -40,7 +40,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
@@ -126,22 +125,27 @@ public class CacheClientNotifier {
private static volatile CacheClientNotifier ccnSingleton;
private final SocketMessageWriter socketMessageWriter = new
SocketMessageWriter();
- private final ClientRegistrationEventQueueManager registrationQueueManager =
- new ClientRegistrationEventQueueManager();
+ private final ClientRegistrationEventQueueManager
clientRegistrationEventQueueManager;
/**
* Factory method to construct a CacheClientNotifier {@code
CacheClientNotifier} instance.
*
* @param cache The GemFire {@code InternalCache}
+ * @param clientRegistrationEventQueueManager Manages temporary registration
queues for clients
* @return A {@code CacheClientNotifier} instance
*/
public static synchronized CacheClientNotifier getInstance(InternalCache
cache,
- CacheServerStats acceptorStats, int maximumMessageCount, int
messageTimeToLive,
- ConnectionListener listener, OverflowAttributes overflowAttributes,
+ ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
+ CacheServerStats acceptorStats,
+ int maximumMessageCount,
+ int messageTimeToLive,
+ ConnectionListener listener,
+ OverflowAttributes overflowAttributes,
boolean isGatewayReceiver) {
if (ccnSingleton == null) {
- ccnSingleton = new CacheClientNotifier(cache, acceptorStats,
maximumMessageCount,
- messageTimeToLive, listener, isGatewayReceiver);
+ ccnSingleton =
+ new CacheClientNotifier(cache, clientRegistrationEventQueueManager,
acceptorStats,
+ maximumMessageCount, messageTimeToLive, listener,
isGatewayReceiver);
}
if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
@@ -177,16 +181,16 @@ public class CacheClientNotifier {
try {
if (isClientPermitted(clientRegistrationMetadata,
clientProxyMembershipID)) {
- registrationQueueManager.create(clientProxyMembershipID, new
ConcurrentLinkedQueue<>(),
- new ReentrantReadWriteLock(), new ReentrantLock());
+ ClientRegistrationEventQueueManager.ClientRegistrationEventQueue
clientRegistrationEventQueue =
+ clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+ new ConcurrentLinkedQueue<>(),
+ new ReentrantReadWriteLock());
try {
registerClientInternal(clientRegistrationMetadata, socket,
isPrimary, acceptorId,
notifyBySubscription);
} finally {
- if (isProxyInitialized(clientProxyMembershipID)) {
- registrationQueueManager.drain(clientProxyMembershipID, this);
- }
+
clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, this);
}
}
} catch (AuthenticationRequiredException ex) {
@@ -288,8 +292,8 @@ public class CacheClientNotifier {
}
cacheClientProxy =
new CacheClientProxy(this, socket, clientProxyMembershipID,
isPrimary, clientConflation,
- clientVersion,
- acceptorId, notifyBySubscription, cache.getSecurityService(),
subject);
+ clientVersion, acceptorId, notifyBySubscription,
cache.getSecurityService(),
+ subject);
successful = initializeProxy(cacheClientProxy);
} else {
cacheClientProxy.setSubject(subject);
@@ -681,7 +685,7 @@ public class CacheClientNotifier {
conflatable = wrapper;
}
- registrationQueueManager.add(event, conflatable, filterClients, this);
+ clientRegistrationEventQueueManager.add(event, conflatable, filterClients,
this);
singletonRouteClientMessage(conflatable, filterClients);
@@ -1185,16 +1189,6 @@ public class CacheClientNotifier {
}
/**
- * Determines whether a client proxy has been initialized
- *
- * @param clientProxyMembershipID The client proxy membership ID
- * @return Whether the client proxy is initialized
- */
- private boolean isProxyInitialized(final ClientProxyMembershipID
clientProxyMembershipID) {
- return getClientProxy(clientProxyMembershipID) != null;
- }
-
- /**
* Returns the {@code CacheClientProxy} associated to the membershipID *
*
* @return the {@code CacheClientProxy} associated to the membershipID
@@ -1696,11 +1690,14 @@ public class CacheClientNotifier {
* @param cache The GemFire {@code InternalCache}
* @param listener a listener which should receive notifications abouts
queues being added or
*/
- private CacheClientNotifier(InternalCache cache, CacheServerStats
acceptorStats,
- int maximumMessageCount, int messageTimeToLive, ConnectionListener
listener,
- boolean isGatewayReceiver) {
+ private CacheClientNotifier(InternalCache cache,
+ ClientRegistrationEventQueueManager clientRegistrationEventQueueManager,
+ CacheServerStats acceptorStats, int maximumMessageCount,
+ int messageTimeToLive,
+ ConnectionListener listener, boolean isGatewayReceiver) {
// Set the Cache
setCache(cache);
+ this.clientRegistrationEventQueueManager =
clientRegistrationEventQueueManager;
this.acceptorStats = acceptorStats;
// we only need one thread per client and wait 50ms for close
socketCloser = new SocketCloser(1, 50);
@@ -2095,9 +2092,11 @@ public class CacheClientNotifier {
@FunctionalInterface
@VisibleForTesting
public interface CacheClientNotifierProvider {
- CacheClientNotifier get(InternalCache cache, CacheServerStats
acceptorStats,
- int maximumMessageCount, int messageTimeToLive, ConnectionListener
listener,
- OverflowAttributes overflowAttributes, boolean isGatewayReceiver);
+ CacheClientNotifier get(InternalCache cache,
+ ClientRegistrationEventQueueManager
clientRegistrationEventQueueManager,
+ CacheServerStats acceptorStats, int maximumMessageCount, int
messageTimeToLive,
+ ConnectionListener listener, OverflowAttributes overflowAttributes,
+ boolean isGatewayReceiver);
}
@VisibleForTesting
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
index 02df497..8216de6 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
@@ -15,12 +15,9 @@
package org.apache.geode.internal.cache.tier.sockets;
-import java.util.Map;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.logging.log4j.Logger;
@@ -30,6 +27,7 @@ import org.apache.geode.internal.cache.FilterProfile;
import org.apache.geode.internal.cache.FilterRoutingInfo;
import org.apache.geode.internal.cache.InternalCacheEvent;
import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.concurrent.ConcurrentHashSet;
import org.apache.geode.internal.logging.LogService;
/**
@@ -37,10 +35,10 @@ import org.apache.geode.internal.logging.LogService;
* client is completely registered (filter info retrieved and GII complete),
we will drain the
* client's queued events and deliver them to the cache client proxy if
necessary.
*/
-class ClientRegistrationEventQueueManager {
+public class ClientRegistrationEventQueueManager {
private static final Logger logger = LogService.getLogger();
- private final Map<ClientProxyMembershipID, ClientRegistrationEventQueue>
registeringProxyEventQueues =
- new ConcurrentHashMap<>();
+ private final Set<ClientRegistrationEventQueue> registeringProxyEventQueues =
+ new ConcurrentHashSet<>();
void add(final InternalCacheEvent event,
final Conflatable conflatable,
@@ -52,13 +50,7 @@ class ClientRegistrationEventQueueManager {
ClientRegistrationEvent clientRegistrationEvent =
new ClientRegistrationEvent(event, conflatable);
- for (final Map.Entry<ClientProxyMembershipID,
ClientRegistrationEventQueue> eventsReceivedWhileRegisteringClient :
registeringProxyEventQueues
- .entrySet()) {
- ClientProxyMembershipID clientProxyMembershipID =
- eventsReceivedWhileRegisteringClient.getKey();
- ClientRegistrationEventQueue registrationEventQueue =
- eventsReceivedWhileRegisteringClient.getValue();
-
+ for (final ClientRegistrationEventQueue registrationEventQueue :
registeringProxyEventQueues) {
registrationEventQueue.lockForPutting();
try {
// If this is an HAEventWrapper we need to increment the PutInProgress
counter so
@@ -69,10 +61,13 @@ class ClientRegistrationEventQueueManager {
((HAEventWrapper) conflatable).incrementPutInProgressCounter("client
registration");
}
+ ClientProxyMembershipID clientProxyMembershipID =
+ registrationEventQueue.getClientProxyMembershipID();
+
// After taking out the lock, we need to determine if the client is
still actually
// registering since there is a small race where it may have finished
registering
// after we pulled the queue out of the registeringProxyEventQueues
collection
- if (registeringProxyEventQueues.containsKey(clientProxyMembershipID)) {
+ if (registeringProxyEventQueues.contains(registrationEventQueue)) {
// If the event value is off-heap, copy it to heap so we are
guaranteed the value
// is available when we drain the registration queue
copyOffHeapToHeapForRegistrationQueue(event);
@@ -90,7 +85,10 @@ class ClientRegistrationEventQueueManager {
// this event and potentially deliver the conflatable to handle the
edge case where
// the original client filter IDs generated in the "normal" put
processing path did
// not include the registering client because the filter info was
not yet available.
- processEventAndDeliverConflatable(clientProxyMembershipID,
cacheClientNotifier, event,
+ CacheClientProxy cacheClientProxy =
+ cacheClientNotifier.getClientProxy(clientProxyMembershipID);
+
+ processEventAndDeliverConflatable(cacheClientProxy,
cacheClientNotifier, event,
conflatable, originalFilterClientIDs);
}
} finally {
@@ -99,6 +97,132 @@ class ClientRegistrationEventQueueManager {
}
}
+ void drain(final ClientRegistrationEventQueue clientRegistrationEventQueue,
+ final CacheClientNotifier cacheClientNotifier) {
+ try {
+ ClientProxyMembershipID clientProxyMembershipID =
+ clientRegistrationEventQueue.getClientProxyMembershipID();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Draining events from registration queue for client proxy
"
+ + clientProxyMembershipID
+ + " without synchronization");
+ }
+
+ CacheClientProxy cacheClientProxy = cacheClientNotifier
+ .getClientProxy(clientProxyMembershipID);
+
+ drainEventsReceivedWhileRegisteringClient(
+ cacheClientProxy,
+ clientRegistrationEventQueue,
+ cacheClientNotifier);
+
+ // Prevents additional events from being added to the queue while we
process and remove it
+ clientRegistrationEventQueue.lockForDraining();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Draining remaining events from registration queue for
client proxy "
+ + clientProxyMembershipID
+ + " with synchronization");
+ }
+
+ drainEventsReceivedWhileRegisteringClient(
+ cacheClientProxy,
+ clientRegistrationEventQueue,
+ cacheClientNotifier);
+ } finally {
+ // The queue must be removed before attempting to release the drain lock
+ // so that no additional events can be added from add threads.
+ registeringProxyEventQueues.remove(clientRegistrationEventQueue);
+
+ if (clientRegistrationEventQueue.isLockForDrainingHeld()) {
+ clientRegistrationEventQueue.unlockForDraining();
+ }
+ }
+ }
+
+ ClientRegistrationEventQueue create(
+ final ClientProxyMembershipID clientProxyMembershipID,
+ final Queue<ClientRegistrationEvent> eventQueue,
+ final ReentrantReadWriteLock eventAddDrainLock) {
+ final ClientRegistrationEventQueue clientRegistrationEventQueue =
+ new ClientRegistrationEventQueue(clientProxyMembershipID, eventQueue,
+ eventAddDrainLock);
+ registeringProxyEventQueues.add(clientRegistrationEventQueue);
+ return clientRegistrationEventQueue;
+ }
+
+ private void processEventAndDeliverConflatable(final CacheClientProxy
cacheClientProxy,
+ final CacheClientNotifier cacheClientNotifier,
+ final InternalCacheEvent internalCacheEvent,
+ final Conflatable conflatable,
+ final Set<ClientProxyMembershipID> originalFilterClientIDs) {
+ try {
+ // If the cache client proxy is null, the registration was not
successful and the proxy
+ // was never added to the initialized proxy collection managed by the
cache client notifier.
+ // If that is the case, we can just decrement the put in progress
counter on the conflatable
+ // if it is an HAEventWrapper.
+ if (cacheClientProxy != null) {
+ // The first step is to repopulate the filter info for the event to
determine if
+ // the client which was registering has a matching CQ or has
registered interest
+ // in the key for this event. We need to get the filter profile,
filter routing info,
+ // and local filter info in order to do so. If any of these are null,
then there is
+ // no need to proceed as the client is not interested.
+ FilterProfile filterProfile =
+ ((LocalRegion) internalCacheEvent.getRegion()).getFilterProfile();
+
+ if (filterProfile != null) {
+ FilterRoutingInfo filterRoutingInfo =
+ filterProfile.getFilterRoutingInfoPart2(null,
internalCacheEvent);
+
+ if (filterRoutingInfo != null) {
+ FilterRoutingInfo.FilterInfo filterInfo =
filterRoutingInfo.getLocalFilterInfo();
+
+ if (filterInfo != null) {
+ ClientUpdateMessageImpl clientUpdateMessage = conflatable
instanceof HAEventWrapper
+ ? (ClientUpdateMessageImpl) ((HAEventWrapper) conflatable)
+ .getClientUpdateMessage()
+ : (ClientUpdateMessageImpl) conflatable;
+
+ internalCacheEvent.setLocalFilterInfo(filterInfo);
+
+ Set<ClientProxyMembershipID> newFilterClientIDs =
+ cacheClientNotifier.getFilterClientIDs(internalCacheEvent,
filterProfile,
+ filterInfo,
+ clientUpdateMessage);
+
+ ClientProxyMembershipID proxyID = cacheClientProxy.getProxyID();
+ if (eventNotInOriginalFilterClientIDs(proxyID,
newFilterClientIDs,
+ originalFilterClientIDs) &&
newFilterClientIDs.contains(proxyID)) {
+ cacheClientProxy.deliverMessage(conflatable);
+ }
+ }
+ }
+ }
+ }
+ } finally {
+ // Once we have processed the conflatable, if it is an HAEventWrapper we
can
+ // decrement the PutInProgress counter, allowing the ClientUpdateMessage
to be
+ // set to null. See decrementPutInProgressCounter() for more details.
+ if (conflatable instanceof HAEventWrapper) {
+ ((HAEventWrapper) conflatable).decrementPutInProgressCounter();
+ }
+ }
+ }
+
+ /*
+ * This is to handle the edge case where the original filter client IDs
+ * calculated by "normal" put processing did not include the registering
client
+ * because the filter info had not been received yet, but we now found that
the client
+ * is interested in the event so we should deliver it.
+ */
+ private boolean eventNotInOriginalFilterClientIDs(final
ClientProxyMembershipID proxyID,
+ final Set<ClientProxyMembershipID> newFilterClientIDs,
+ final Set<ClientProxyMembershipID> originalFilterClientIDs) {
+ return originalFilterClientIDs == null
+ || (!originalFilterClientIDs.contains(proxyID) &&
newFilterClientIDs.contains(proxyID));
+ }
+
/**
* For simplicity, we will copy off-heap registration queue values to heap
to avoid
* complicated off-heap reference counting. Since the registration queue is
only a
@@ -113,93 +237,54 @@ class ClientRegistrationEventQueueManager {
}
}
- void drain(final ClientProxyMembershipID clientProxyMembershipID,
- final CacheClientNotifier cacheClientNotifier) {
- ClientRegistrationEventQueue registrationEventQueue =
- registeringProxyEventQueues.get(clientProxyMembershipID);
-
- if (registrationEventQueue != null) {
- // It is possible that several client registration threads are active
for the same
- // ClientProxyMembershipID, in which case we only want a single drainer
to drain
- // and remove the queue.
- registrationEventQueue.lockForSingleDrainer();
- try {
- // See if the queue is still available after acquiring the lock as it
may have
- // been removed from registeringProxyEventQueues by the previous thread
- if (registeringProxyEventQueues.containsKey(clientProxyMembershipID)) {
- // As an optimization, we drain as many events from the queue as we
can
- // before taking out a lock to drain the remaining events. When we
lock for draining,
- // it prevents additional events from being added to the queue while
the queue is drained
- // and removed.
- if (logger.isDebugEnabled()) {
- logger.debug("Draining events from registration queue for client
proxy "
- + clientProxyMembershipID
- + " without synchronization");
- }
-
- drainEventsReceivedWhileRegisteringClient(clientProxyMembershipID,
registrationEventQueue,
- cacheClientNotifier);
-
- // Prevents additional events from being added to the queue while we
process and remove it
- registrationEventQueue.lockForDraining();
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Draining remaining events from registration queue
for client proxy "
- + clientProxyMembershipID + " with synchronization");
- }
-
- drainEventsReceivedWhileRegisteringClient(clientProxyMembershipID,
- registrationEventQueue,
- cacheClientNotifier);
-
- registeringProxyEventQueues.remove(clientProxyMembershipID);
- } finally {
- registrationEventQueue.unlockForDraining();
- }
- }
- } finally {
- registrationEventQueue.unlockForSingleDrainer();
- }
- }
- }
-
- private void drainEventsReceivedWhileRegisteringClient(final
ClientProxyMembershipID proxyID,
+ private void drainEventsReceivedWhileRegisteringClient(
+ final CacheClientProxy cacheClientProxy,
final ClientRegistrationEventQueue registrationEventQueue,
final CacheClientNotifier cacheClientNotifier) {
ClientRegistrationEvent queuedEvent;
while ((queuedEvent = registrationEventQueue.poll()) != null) {
InternalCacheEvent internalCacheEvent = queuedEvent.internalCacheEvent;
Conflatable conflatable = queuedEvent.conflatable;
- processEventAndDeliverConflatable(proxyID, cacheClientNotifier,
internalCacheEvent,
- conflatable, null);
+ processEventAndDeliverConflatable(cacheClientProxy,
+ cacheClientNotifier, internalCacheEvent, conflatable, null);
}
}
- public ClientRegistrationEventQueue create(
- final ClientProxyMembershipID clientProxyMembershipID,
- final Queue<ClientRegistrationEvent> eventQueue,
- final ReadWriteLock eventAddDrainLock,
- final ReentrantLock singleDrainerLock) {
- final ClientRegistrationEventQueue clientRegistrationEventQueue =
- new ClientRegistrationEventQueue(eventQueue,
- eventAddDrainLock, singleDrainerLock);
- registeringProxyEventQueues.putIfAbsent(clientProxyMembershipID,
- clientRegistrationEventQueue);
- return clientRegistrationEventQueue;
+ /**
+ * Represents a conflatable and event processed while a client was
registering.
+ * This needs to be queued and processed after registration is complete. The
conflatable
+ * is what we will actually be delivering to the MessageDispatcher (and
thereby adding
+ * to the HARegionQueue). The internal cache event is required to rehydrate
the filter
+ * info and determine if the client which was registering does have a CQ
that matches or
+ * has registered interest in the key.
+ */
+ private class ClientRegistrationEvent {
+ private final Conflatable conflatable;
+ private final InternalCacheEvent internalCacheEvent;
+
+ ClientRegistrationEvent(final InternalCacheEvent internalCacheEvent,
+ final Conflatable conflatable) {
+ this.conflatable = conflatable;
+ this.internalCacheEvent = internalCacheEvent;
+ }
}
class ClientRegistrationEventQueue {
+ private final ClientProxyMembershipID clientProxyMembershipID;
private final Queue<ClientRegistrationEvent> eventQueue;
- private final ReadWriteLock eventAddDrainLock;
- private final ReentrantLock singleDrainerLock;
+ private final ReentrantReadWriteLock eventAddDrainLock;
ClientRegistrationEventQueue(
+ final ClientProxyMembershipID clientProxyMembershipID,
final Queue<ClientRegistrationEvent> eventQueue,
- final ReadWriteLock eventAddDrainLock,
- final ReentrantLock singleDrainerLock) {
+ final ReentrantReadWriteLock eventAddDrainLock) {
+ this.clientProxyMembershipID = clientProxyMembershipID;
this.eventQueue = eventQueue;
this.eventAddDrainLock = eventAddDrainLock;
- this.singleDrainerLock = singleDrainerLock;
+ }
+
+ public ClientProxyMembershipID getClientProxyMembershipID() {
+ return clientProxyMembershipID;
}
boolean isEmpty() {
@@ -222,6 +307,10 @@ class ClientRegistrationEventQueueManager {
eventAddDrainLock.writeLock().unlock();
}
+ private boolean isLockForDrainingHeld() {
+ return eventAddDrainLock.writeLock().isHeldByCurrentThread();
+ }
+
private void lockForPutting() {
eventAddDrainLock.readLock().lock();
}
@@ -229,103 +318,5 @@ class ClientRegistrationEventQueueManager {
private void unlockForPutting() {
eventAddDrainLock.readLock().unlock();
}
-
- private void lockForSingleDrainer() {
- singleDrainerLock.lock();
- }
-
- private void unlockForSingleDrainer() {
- singleDrainerLock.unlock();
- }
- }
-
- private void processEventAndDeliverConflatable(final ClientProxyMembershipID
proxyID,
- final CacheClientNotifier cacheClientNotifier,
- final InternalCacheEvent internalCacheEvent,
- final Conflatable conflatable,
- final Set<ClientProxyMembershipID> originalFilterClientIDs) {
- // The first step is to repopulate the filter info for the event to
determine if
- // the client which was registering has a matching CQ or has registered
interest
- // in the key for this event. We need to get the filter profile, filter
routing info,
- // and local filter info in order to do so. If any of these are null, then
there is
- // no need to proceed as the client is not interested.
- FilterProfile filterProfile =
- ((LocalRegion) internalCacheEvent.getRegion()).getFilterProfile();
-
- if (filterProfile != null) {
- FilterRoutingInfo filterRoutingInfo =
- filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent);
-
- if (filterRoutingInfo != null) {
- FilterRoutingInfo.FilterInfo filterInfo =
filterRoutingInfo.getLocalFilterInfo();
-
- if (filterInfo != null) {
- ClientUpdateMessageImpl clientUpdateMessage = conflatable instanceof
HAEventWrapper
- ? (ClientUpdateMessageImpl) ((HAEventWrapper)
conflatable).getClientUpdateMessage()
- : (ClientUpdateMessageImpl) conflatable;
-
- internalCacheEvent.setLocalFilterInfo(filterInfo);
-
- Set<ClientProxyMembershipID> newFilterClientIDs =
- cacheClientNotifier.getFilterClientIDs(internalCacheEvent,
filterProfile,
- filterInfo,
- clientUpdateMessage);
-
- if (eventNotInOriginalFilterClientIDs(proxyID, newFilterClientIDs,
- originalFilterClientIDs)) {
- CacheClientProxy cacheClientProxy =
cacheClientNotifier.getClientProxy(proxyID);
-
- if (eventShouldBeDelivered(proxyID, newFilterClientIDs,
cacheClientProxy)) {
- cacheClientProxy.deliverMessage(conflatable);
- }
- }
- }
- }
- }
-
- // Once we have processed the conflatable, if it is an HAEventWrapper we
can
- // decrement the PutInProgress counter, allowing the ClientUpdateMessage
to be
- // set to null. See decrementPutInProgressCounter() for more details.
- if (conflatable instanceof HAEventWrapper) {
- ((HAEventWrapper) conflatable).decrementPutInProgressCounter();
- }
- }
-
- private boolean eventShouldBeDelivered(final ClientProxyMembershipID proxyID,
- final Set<ClientProxyMembershipID> filterClientIDs,
- final CacheClientProxy cacheClientProxy) {
- return filterClientIDs.contains(proxyID) && cacheClientProxy != null;
- }
-
- /*
- * This is to handle the edge case where the original filter client IDs
- * calculated by "normal" put processing did not include the registering
client
- * because the filter info had not been received yet, but we now found that
the client
- * is interested in the event so we should deliver it.
- */
- private boolean eventNotInOriginalFilterClientIDs(final
ClientProxyMembershipID proxyID,
- final Set<ClientProxyMembershipID> newFilterClientIDs,
- final Set<ClientProxyMembershipID> originalFilterClientIDs) {
- return originalFilterClientIDs == null
- || (!originalFilterClientIDs.contains(proxyID) &&
newFilterClientIDs.contains(proxyID));
- }
-
- /**
- * Represents a conflatable and event processed while a client was
registering.
- * This needs to be queued and processed after registration is complete. The
conflatable
- * is what we will actually be delivering to the MessageDispatcher (and
thereby adding
- * to the HARegionQueue). The internal cache event is required to rehydrate
the filter
- * info and determine if the client which was registering does have a CQ
that matches or
- * has registered interest in the key.
- */
- private class ClientRegistrationEvent {
- private final Conflatable conflatable;
- private final InternalCacheEvent internalCacheEvent;
-
- ClientRegistrationEvent(final InternalCacheEvent internalCacheEvent,
- final Conflatable conflatable) {
- this.conflatable = conflatable;
- this.internalCacheEvent = internalCacheEvent;
- }
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index 33ec3aa..731c952 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -171,8 +171,6 @@ public class ClientUpdateMessageImpl implements
ClientUpdateMessage, Sizeable, N
public ClientUpdateMessageImpl(EnumListenerEvent operation, LocalRegion
region,
Object keyOfInterest, Object value, byte[] delta, byte valueIsObject,
Object callbackArgument,
ClientProxyMembershipID memberId, EventID eventIdentifier, VersionTag
versionTag) {
- // this._clientInterestList = new HashSet();
- // this._clientInterestListInv = new HashSet();
this._operation = operation;
this._regionName = region.getFullPath();
this._keyOfInterest = keyOfInterest;
@@ -196,8 +194,6 @@ public class ClientUpdateMessageImpl implements
ClientUpdateMessage, Sizeable, N
*/
protected ClientUpdateMessageImpl(EnumListenerEvent operation,
ClientProxyMembershipID memberId,
EventID eventIdentifier) {
- // this._clientInterestList = new HashSet();
- // this._clientInterestListInv = new HashSet();
this._operation = operation;
this._membershipId = memberId;
this._eventIdentifier = eventIdentifier;
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionTest.java
index 4708071..0b41015 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionTest.java
@@ -49,6 +49,7 @@ import
org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import
org.apache.geode.internal.cache.tier.sockets.ClientRegistrationEventQueueManager;
import org.apache.geode.internal.cache.tier.sockets.ConnectionListener;
import org.apache.geode.test.fake.Fakes;
@@ -342,8 +343,9 @@ public class BucketRegionTest {
doReturn(mock(SystemTimer.class)).when(cache).getCCPTimer();
CacheClientNotifier ccn =
- CacheClientNotifier.getInstance(cache, mock(CacheServerStats.class),
10,
- 10, mock(ConnectionListener.class), null, true);
+ CacheClientNotifier.getInstance(cache,
mock(ClientRegistrationEventQueueManager.class),
+ mock(CacheServerStats.class), 10, 10,
+ mock(ConnectionListener.class), null, true);
bucketRegion.notifyClientsOfTombstoneGC(regionGCVersions, keysRemoved,
eventID, routing);
verify(bucketRegion, never()).getFilterProfile();
@@ -365,8 +367,8 @@ public class BucketRegionTest {
doReturn(mock(SystemTimer.class)).when(cache).getCCPTimer();
CacheClientNotifier ccn =
- CacheClientNotifier.getInstance(cache, mock(CacheServerStats.class),
10,
- 10, mock(ConnectionListener.class), null, true);
+ CacheClientNotifier.getInstance(cache,
mock(ClientRegistrationEventQueueManager.class),
+ mock(CacheServerStats.class), 10, 10,
mock(ConnectionListener.class), null, true);
doReturn(mock(ClientProxyMembershipID.class)).when(proxy).getProxyID();
ccn.addClientProxyToMap(proxy);
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheServerImplTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheServerImplTest.java
index b8d5362..9a99b94 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheServerImplTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheServerImplTest.java
@@ -88,8 +88,9 @@ public class CacheServerImplTest {
@Test
public void createdAcceptorIsGatewayEndpoint() throws IOException {
OverflowAttributes overflowAttributes = mock(OverflowAttributes.class);
- InternalCacheServer server = new CacheServerImpl(cache, securityService,
new AcceptorBuilder(),
- true, true, () -> socketCreator, (a, b, c, d, e, f, g) ->
cacheClientNotifier,
+ InternalCacheServer server = new CacheServerImpl(cache, securityService,
+ new AcceptorBuilder(), true, true,
+ () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
(a, b, c) -> clientHealthMonitor, a -> advisor);
Acceptor acceptor = server.createAcceptor(overflowAttributes);
@@ -99,8 +100,9 @@ public class CacheServerImplTest {
@Test
public void getGroups_returnsSpecifiedGroup() {
- InternalCacheServer server = new CacheServerImpl(cache, securityService,
new AcceptorBuilder(),
- true, true, () -> socketCreator, (a, b, c, d, e, f, g) ->
cacheClientNotifier,
+ InternalCacheServer server = new CacheServerImpl(cache, securityService,
+ new AcceptorBuilder(), true, true,
+ () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
(a, b, c) -> clientHealthMonitor, a -> advisor);
String specifiedGroup = "group0";
@@ -112,8 +114,9 @@ public class CacheServerImplTest {
@Test
public void getGroups_returnsMultipleSpecifiedGroups() {
- InternalCacheServer server = new CacheServerImpl(cache, securityService,
new AcceptorBuilder(),
- true, true, () -> socketCreator, (a, b, c, d, e, f, g) ->
cacheClientNotifier,
+ InternalCacheServer server = new CacheServerImpl(cache, securityService,
+ new AcceptorBuilder(), true, true,
+ () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
(a, b, c) -> clientHealthMonitor, a -> advisor);
String specifiedGroup1 = "group1";
String specifiedGroup2 = "group2";
@@ -129,8 +132,9 @@ public class CacheServerImplTest {
public void getCombinedGroups_includesMembershipGroup() {
String membershipGroup = "group-m0";
when(config.getGroups()).thenReturn(membershipGroup);
- InternalCacheServer server = new CacheServerImpl(cache, securityService,
new AcceptorBuilder(),
- true, true, () -> socketCreator, (a, b, c, d, e, f, g) ->
cacheClientNotifier,
+ InternalCacheServer server = new CacheServerImpl(cache, securityService,
+ new AcceptorBuilder(), true, true,
+ () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
(a, b, c) -> clientHealthMonitor, a -> advisor);
assertThat(server.getCombinedGroups())
@@ -144,8 +148,9 @@ public class CacheServerImplTest {
String membershipGroup3 = "group-m3";
when(config.getGroups())
.thenReturn(membershipGroup1 + "," + membershipGroup2 + "," +
membershipGroup3);
- InternalCacheServer server = new CacheServerImpl(cache, securityService,
new AcceptorBuilder(),
- true, true, () -> socketCreator, (a, b, c, d, e, f, g) ->
cacheClientNotifier,
+ InternalCacheServer server = new CacheServerImpl(cache, securityService,
+ new AcceptorBuilder(), true, true,
+ () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
(a, b, c) -> clientHealthMonitor, a -> advisor);
assertThat(server.getCombinedGroups())
@@ -159,8 +164,9 @@ public class CacheServerImplTest {
String membershipGroup3 = "group-m3";
when(config.getGroups())
.thenReturn(membershipGroup1 + "," + membershipGroup2 + "," +
membershipGroup3);
- InternalCacheServer server = new CacheServerImpl(cache, securityService,
new AcceptorBuilder(),
- true, true, () -> socketCreator, (a, b, c, d, e, f, g) ->
cacheClientNotifier,
+ InternalCacheServer server = new CacheServerImpl(cache, securityService,
+ new AcceptorBuilder(), true, true,
+ () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
(a, b, c) -> clientHealthMonitor, a -> advisor);
String specifiedGroup1 = "group1";
String specifiedGroup2 = "group2";
@@ -175,8 +181,9 @@ public class CacheServerImplTest {
@Test
public void startNotifiesResourceEventCacheServerStart() throws IOException {
- InternalCacheServer server = new CacheServerImpl(cache, securityService,
new AcceptorBuilder(),
- true, true, () -> socketCreator, (a, b, c, d, e, f, g) ->
cacheClientNotifier,
+ InternalCacheServer server = new CacheServerImpl(cache, securityService,
+ new AcceptorBuilder(), true, true,
+ () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
(a, b, c) -> clientHealthMonitor, a -> advisor);
server.start();
@@ -186,8 +193,9 @@ public class CacheServerImplTest {
@Test
public void stopNotifiesResourceEventCacheServerStart() throws IOException {
- InternalCacheServer server = new CacheServerImpl(cache, securityService,
new AcceptorBuilder(),
- true, true, () -> socketCreator, (a, b, c, d, e, f, g) ->
cacheClientNotifier,
+ InternalCacheServer server = new CacheServerImpl(cache, securityService,
+ new AcceptorBuilder(), true, true,
+ () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
(a, b, c) -> clientHealthMonitor, a -> advisor);
server.start();
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionPartialMockTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionPartialMockTest.java
index c345d6b..8d4e38c 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionPartialMockTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionPartialMockTest.java
@@ -51,6 +51,7 @@ import
org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import
org.apache.geode.internal.cache.tier.sockets.ClientRegistrationEventQueueManager;
import org.apache.geode.internal.cache.tier.sockets.ConnectionListener;
public class LocalRegionPartialMockTest {
@@ -226,8 +227,9 @@ public class LocalRegionPartialMockTest {
when(cache.getCCPTimer()).thenReturn(mock(SystemTimer.class));
CacheClientNotifier ccn =
- CacheClientNotifier.getInstance(cache, mock(CacheServerStats.class),
10,
- 10, mock(ConnectionListener.class), null, true);
+ CacheClientNotifier.getInstance(cache,
mock(ClientRegistrationEventQueueManager.class),
+ mock(CacheServerStats.class), 10, 10,
+ mock(ConnectionListener.class), null, true);
doCallRealMethod().when(region).notifyClientsOfTombstoneGC(regionGCVersions,
keysRemoved,
eventID, routing);
@@ -248,8 +250,9 @@ public class LocalRegionPartialMockTest {
when(cache.getCCPTimer()).thenReturn(mock(SystemTimer.class));
CacheClientNotifier ccn =
- CacheClientNotifier.getInstance(cache, mock(CacheServerStats.class),
10,
- 10, mock(ConnectionListener.class), null, true);
+ CacheClientNotifier.getInstance(cache,
mock(ClientRegistrationEventQueueManager.class),
+ mock(CacheServerStats.class), 10, 10,
+ mock(ConnectionListener.class), null, true);
when(proxy.getProxyID()).thenReturn(mock(ClientProxyMembershipID.class));
ccn.addClientProxyToMap(proxy);
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
index a67b2d6..b01103f 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.internal.cache.tier.sockets;
+import static java.util.Collections.emptyList;
import static
org.apache.geode.cache.server.CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS;
import static
org.apache.geode.cache.server.CacheServer.DEFAULT_SOCKET_BUFFER_SIZE;
import static org.apache.geode.cache.server.CacheServer.DEFAULT_TCP_NO_DELAY;
@@ -29,7 +30,6 @@ import static org.mockito.quality.Strictness.STRICT_STUBS;
import java.net.ServerSocket;
import java.net.SocketAddress;
-import java.util.Collections;
import java.util.Properties;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
@@ -96,8 +96,8 @@ public class AcceptorImplTest {
DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache, MINIMUM_MAX_CONNECTIONS, 0,
CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT,
CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null,
null, DEFAULT_TCP_NO_DELAY, serverConnectionFactory, 1000,
securityService,
- () -> socketCreator, (a, b, c, d, e, f, g) -> cacheClientNotifier,
- (a, b, c) -> clientHealthMonitor, false, Collections.emptyList());
+ () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
+ (a, b, c) -> clientHealthMonitor, false, emptyList());
assertThat(acceptor.isGatewayReceiver()).isFalse();
}
@@ -114,8 +114,8 @@ public class AcceptorImplTest {
DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache, MINIMUM_MAX_CONNECTIONS, 0,
CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT,
CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null,
null, DEFAULT_TCP_NO_DELAY, serverConnectionFactory, 1000,
securityService,
- () -> socketCreator, (a, b, c, d, e, f, g) -> cacheClientNotifier,
- (a, b, c) -> clientHealthMonitor, true, Collections.emptyList());
+ () -> socketCreator, (a, b, c, d, e, f, g, h) -> cacheClientNotifier,
+ (a, b, c) -> clientHealthMonitor, true, emptyList());
assertThat(acceptor.isGatewayReceiver()).isTrue();
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
index 31f059b..d48ab37 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
@@ -12,11 +12,13 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache.tier.sockets;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -48,9 +50,11 @@ import org.apache.geode.internal.cache.FilterProfile;
import org.apache.geode.internal.cache.FilterRoutingInfo;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalCacheEvent;
+import org.apache.geode.internal.cache.RegionQueueException;
import org.apache.geode.test.fake.Fakes;
public class CacheClientNotifierTest {
+
@Test
public void
eventsInClientRegistrationQueueAreSentToClientAfterRegistrationIsComplete()
throws IOException, ClassNotFoundException, NoSuchMethodException,
IllegalAccessException,
@@ -63,11 +67,13 @@ public class CacheClientNotifierTest {
CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
ClientUpdateMessageImpl clientUpdateMessage =
mock(ClientUpdateMessageImpl.class);
ClientRegistrationMetadata clientRegistrationMetadata =
mock(ClientRegistrationMetadata.class);
+
when(clientRegistrationMetadata.getClientProxyMembershipID()).thenReturn(
clientProxyMembershipID);
CacheClientNotifier cacheClientNotifier =
CacheClientNotifier.getInstance(internalCache,
- cacheServerStats, 0, 0, connectionListener, null, false);
+ new ClientRegistrationEventQueueManager(), cacheServerStats, 0, 0,
+ connectionListener, null, false);
final CacheClientNotifier cacheClientNotifierSpy =
spy(cacheClientNotifier);
CountDownLatch waitForEventDispatchCountdownLatch = new CountDownLatch(1);
@@ -137,6 +143,48 @@ public class CacheClientNotifierTest {
verify(cacheClientProxy,
times(1)).deliverMessage(isA(HAEventWrapper.class));
}
+ @Test
+ public void clientRegistrationFailsQueueStillDrained()
+ throws ClassNotFoundException, NoSuchMethodException,
InvocationTargetException,
+ IllegalAccessException, IOException {
+ InternalCache internalCache = Fakes.cache();
+ CacheServerStats cacheServerStats = mock(CacheServerStats.class);
+ Socket socket = mock(Socket.class);
+ ConnectionListener connectionListener = mock(ConnectionListener.class);
+ ClientRegistrationMetadata clientRegistrationMetadata =
mock(ClientRegistrationMetadata.class);
+ ClientProxyMembershipID clientProxyMembershipID =
mock(ClientProxyMembershipID.class);
+ ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
+ mock(ClientRegistrationEventQueueManager.class);
+ ClientRegistrationEventQueueManager.ClientRegistrationEventQueue
clientRegistrationEventQueue =
+
mock(ClientRegistrationEventQueueManager.ClientRegistrationEventQueue.class);
+
+ when(clientRegistrationMetadata.getClientProxyMembershipID()).thenReturn(
+ clientProxyMembershipID);
+
when(clientRegistrationEventQueueManager.create(eq(clientProxyMembershipID),
any(), any()))
+ .thenReturn(clientRegistrationEventQueue);
+
+ CacheClientNotifier cacheClientNotifier =
CacheClientNotifier.getInstance(internalCache,
+ clientRegistrationEventQueueManager, cacheServerStats, 0, 0,
+ connectionListener, null, false);
+ CacheClientNotifier cacheClientNotifierSpy = spy(cacheClientNotifier);
+
+ doAnswer((i) -> {
+ throw new RegionQueueException();
+
}).when(cacheClientNotifierSpy).registerClientInternal(clientRegistrationMetadata,
socket,
+ false, 0, true);
+
+ assertThatThrownBy(() ->
cacheClientNotifierSpy.registerClient(clientRegistrationMetadata,
+ socket, false, 0, true))
+ .isInstanceOf(IOException.class);
+
+ verify(clientRegistrationEventQueueManager, times(1)).create(
+ eq(clientProxyMembershipID), any(), any());
+
+ verify(clientRegistrationEventQueueManager, times(1)).drain(
+ eq(clientRegistrationEventQueue),
+ eq(cacheClientNotifierSpy));
+ }
+
private InternalCacheEvent createMockInternalCacheEvent(
final ClientProxyMembershipID clientProxyMembershipID,
final ClientUpdateMessageImpl clientUpdateMessage,
@@ -188,8 +236,10 @@ public class CacheClientNotifierTest {
InternalCache internalCache = Fakes.cache();
CacheClientNotifier ccn =
- CacheClientNotifier.getInstance(internalCache,
mock(CacheServerStats.class), 10,
- 10, mock(ConnectionListener.class), null, true);
+ CacheClientNotifier.getInstance(internalCache,
+ mock(ClientRegistrationEventQueueManager.class),
+ mock(CacheServerStats.class), 10, 10,
+ mock(ConnectionListener.class), null, true);
assertFalse(CacheClientNotifier.singletonHasClientProxies());
ccn.shutdown(111);
@@ -202,8 +252,10 @@ public class CacheClientNotifierTest {
CacheClientProxy proxy = mock(CacheClientProxy.class);
CacheClientNotifier ccn =
- CacheClientNotifier.getInstance(internalCache,
mock(CacheServerStats.class), 10,
- 10, mock(ConnectionListener.class), null, true);
+ CacheClientNotifier.getInstance(internalCache,
+ mock(ClientRegistrationEventQueueManager.class),
+ mock(CacheServerStats.class), 10, 10,
+ mock(ConnectionListener.class), null, true);
when(proxy.getProxyID()).thenReturn(mock(ClientProxyMembershipID.class));
ccn.addClientProxy(proxy);
@@ -221,8 +273,10 @@ public class CacheClientNotifierTest {
CacheClientProxy proxy = mock(CacheClientProxy.class);
CacheClientNotifier ccn =
- CacheClientNotifier.getInstance(internalCache,
mock(CacheServerStats.class), 10,
- 10, mock(ConnectionListener.class), null, true);
+ CacheClientNotifier.getInstance(internalCache,
+ mock(ClientRegistrationEventQueueManager.class),
+ mock(CacheServerStats.class), 10, 10,
+ mock(ConnectionListener.class), null, true);
when(proxy.getProxyID()).thenReturn(mock(ClientProxyMembershipID.class));
ccn.addClientInitProxy(proxy);
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
index f392e7b..16a4042 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
@@ -16,20 +16,21 @@
package org.apache.geode.internal.cache.tier.sockets;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.junit.Test;
@@ -44,35 +45,15 @@ import org.apache.geode.internal.cache.LocalRegion;
public class ClientRegistrationEventQueueManagerTest {
@Test
- public void
messageDeliveredAfterRegisteringOnDrainIfNewFilterIDsIncludesClient()
- throws ExecutionException, InterruptedException {
+ public void
messageDeliveredAfterRegisteringOnDrainIfNewFilterIDsIncludesClient() {
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
ClientProxyMembershipID clientProxyMembershipID =
mock(ClientProxyMembershipID.class);
- ReadWriteLock mockPutDrainLock = mock(ReadWriteLock.class);
- ReadWriteLock actualPutDrainLock = new ReentrantReadWriteLock();
-
- CountDownLatch waitForDrainAndRemoveLatch = new CountDownLatch(1);
- CountDownLatch waitForAddInProgressLatch = new CountDownLatch(1);
-
- when(mockPutDrainLock.readLock())
- .thenAnswer(i -> {
- waitForAddInProgressLatch.countDown();
- waitForDrainAndRemoveLatch.await();
- return actualPutDrainLock.readLock();
- });
-
- when(mockPutDrainLock.writeLock())
- .thenAnswer(i -> {
- waitForAddInProgressLatch.await();
- waitForDrainAndRemoveLatch.countDown();
- return actualPutDrainLock.writeLock();
- });
-
- clientRegistrationEventQueueManager.create(clientProxyMembershipID,
- new ConcurrentLinkedQueue<>(), mockPutDrainLock, new ReentrantLock());
+ ClientRegistrationEventQueueManager.ClientRegistrationEventQueue
clientRegistrationEventQueue =
+ clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+ new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
LocalRegion localRegion = mock(LocalRegion.class);
@@ -97,24 +78,18 @@ public class ClientRegistrationEventQueueManagerTest {
clientUpdateMessage))
.thenReturn(recalculatedFilterClientIDs);
CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
+ when(cacheClientProxy.getProxyID()).thenReturn(clientProxyMembershipID);
when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
// Create empty filter client IDs produced by the "normal" put processing
path, so we can test
// that the event is still delivered if the client finished registering
and needs the event.
Set<ClientProxyMembershipID> normalPutFilterClientIDs = new HashSet<>();
- CompletableFuture<Void> addEventsToQueueTask =
CompletableFuture.runAsync(() -> {
- // In thread one, we add and event to the queue
- clientRegistrationEventQueueManager
- .add(internalCacheEvent, clientUpdateMessage,
normalPutFilterClientIDs,
- cacheClientNotifier);
- });
- CompletableFuture<Void> drainEventsFromQueueTask =
CompletableFuture.runAsync(() -> {
- // In thread two, we drain the event from the queue
- clientRegistrationEventQueueManager.drain(clientProxyMembershipID,
cacheClientNotifier);
- });
+ clientRegistrationEventQueueManager
+ .add(internalCacheEvent, clientUpdateMessage, normalPutFilterClientIDs,
+ cacheClientNotifier);
- CompletableFuture.allOf(addEventsToQueueTask,
drainEventsFromQueueTask).get();
+ clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue,
cacheClientNotifier);
// The client update message should still be delivered because it is now
part of the
// filter clients interested in this event, despite having not been
included in the original
@@ -130,7 +105,7 @@ public class ClientRegistrationEventQueueManagerTest {
ClientProxyMembershipID clientProxyMembershipID =
mock(ClientProxyMembershipID.class);
clientRegistrationEventQueueManager.create(clientProxyMembershipID,
- new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock(), new
ReentrantLock());
+ new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
@@ -156,13 +131,43 @@ public class ClientRegistrationEventQueueManagerTest {
}
@Test
- public void addAndDrainQueueContentionTest() throws ExecutionException,
InterruptedException {
+ public void
putInProgressCounterIncrementedOnAddAndDecrementedOnRemoveForAllEvents() {
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
ClientProxyMembershipID clientProxyMembershipID =
mock(ClientProxyMembershipID.class);
- ReadWriteLock mockPutDrainLock = mock(ReadWriteLock.class);
- ReadWriteLock actualPutDrainLock = new ReentrantReadWriteLock();
+
+ ClientRegistrationEventQueueManager.ClientRegistrationEventQueue
clientRegistrationEventQueue =
+ clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+ new ConcurrentLinkedQueue<>(),
+ new ReentrantReadWriteLock());
+
+ List<HAEventWrapper> haEventWrappers = new ArrayList<>();
+ CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
+
+ for (int i = 0; i < 5; ++i) {
+ HAEventWrapper haEventWrapper = mock(HAEventWrapper.class);
+ haEventWrappers.add(haEventWrapper);
+ InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
+ when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
+
when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
+ clientRegistrationEventQueueManager.add(internalCacheEvent,
+ haEventWrapper, new HashSet<>(), cacheClientNotifier);
+ verify(haEventWrapper,
times(1)).incrementPutInProgressCounter(anyString());
+ }
+
+ clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue,
cacheClientNotifier);
+
+ for (HAEventWrapper haEventWrapper : haEventWrappers) {
+ verify(haEventWrapper, times(1)).decrementPutInProgressCounter();
+ }
+ }
+
+ @Test
+ public void addAndDrainQueueContentionTest() throws ExecutionException,
InterruptedException {
+ ClientProxyMembershipID clientProxyMembershipID =
mock(ClientProxyMembershipID.class);
+ ReentrantReadWriteLock mockPutDrainLock =
mock(ReentrantReadWriteLock.class);
+ ReentrantReadWriteLock actualPutDrainLock = new ReentrantReadWriteLock();
when(mockPutDrainLock.readLock())
.thenReturn(actualPutDrainLock.readLock());
@@ -174,9 +179,12 @@ public class ClientRegistrationEventQueueManagerTest {
return actualPutDrainLock.writeLock();
});
+ ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
+ new ClientRegistrationEventQueueManager();
+
ClientRegistrationEventQueueManager.ClientRegistrationEventQueue
clientRegistrationEventQueue =
clientRegistrationEventQueueManager.create(clientProxyMembershipID,
- new ConcurrentLinkedQueue<>(), mockPutDrainLock, new
ReentrantLock());
+ new ConcurrentLinkedQueue<>(), mockPutDrainLock);
InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
@@ -185,6 +193,8 @@ public class ClientRegistrationEventQueueManagerTest {
Conflatable conflatable = mock(Conflatable.class);
Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
+ CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
+
when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
CompletableFuture<Void> addEventsToQueueTask =
CompletableFuture.runAsync(() -> {
for (int numAdds = 0; numAdds < 100000; ++numAdds) {
@@ -196,7 +206,7 @@ public class ClientRegistrationEventQueueManagerTest {
CompletableFuture<Void> drainEventsFromQueueTask =
CompletableFuture.runAsync(() -> {
// In thread two, we drain events from the queue
- clientRegistrationEventQueueManager.drain(clientProxyMembershipID,
cacheClientNotifier);
+ clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue,
cacheClientNotifier);
});
CompletableFuture.allOf(addEventsToQueueTask,
drainEventsFromQueueTask).get();
@@ -205,77 +215,154 @@ public class ClientRegistrationEventQueueManagerTest {
}
@Test
- public void twoThreadsRegisteringSameClientNoEventsLost()
- throws ExecutionException, InterruptedException {
- ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
- new ClientRegistrationEventQueueManager();
-
- InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
+ public void addEventWithOffheapValueCopiedToHeap() {
+ EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
- when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
+ Operation mockOperation = mock(Operation.class);
+ when(mockOperation.isEntry()).thenReturn(true);
+ when(internalCacheEvent.getOperation()).thenReturn(mockOperation);
Conflatable conflatable = mock(Conflatable.class);
Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
ClientProxyMembershipID clientProxyMembershipID =
mock(ClientProxyMembershipID.class);
+ ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
+ new ClientRegistrationEventQueueManager();
+
+ clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+ new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
+
+ clientRegistrationEventQueueManager
+ .add(internalCacheEvent, conflatable, filterClientIDs,
cacheClientNotifier);
+
+ verify(internalCacheEvent, times(1)).copyOffHeapToHeap();
+ }
+
+ @Test
+ public void clientWasNeverRegisteredDrainQueueStillRemoved() {
+ ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
+ new ClientRegistrationEventQueueManager();
+
+ CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
+ ClientProxyMembershipID clientProxyMembershipID =
mock(ClientProxyMembershipID.class);
+
ClientRegistrationEventQueueManager.ClientRegistrationEventQueue
clientRegistrationEventQueue =
clientRegistrationEventQueueManager.create(clientProxyMembershipID,
- new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock(), new
ReentrantLock());
-
- for (int registrationIterations = 0; registrationIterations < 1000;
++registrationIterations) {
- Runnable clientRegistrationSimulation = () -> {
- for (int numAdds = 0; numAdds < getRandomNumberOfAdds(); ++numAdds) {
- // In thread one, we add events to the queue
- clientRegistrationEventQueueManager
- .add(internalCacheEvent, conflatable, filterClientIDs,
cacheClientNotifier);
- }
- // In thread two, we drain events from the queue
- clientRegistrationEventQueueManager.drain(clientProxyMembershipID,
cacheClientNotifier);
- };
-
- CompletableFuture<Void> registrationFutureOne =
- CompletableFuture.runAsync(clientRegistrationSimulation);
- CompletableFuture<Void> registrationFutureTwo =
- CompletableFuture.runAsync(clientRegistrationSimulation);
-
- CompletableFuture.allOf(registrationFutureOne,
registrationFutureTwo).get();
-
- assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
- }
+ new ConcurrentLinkedQueue<>(),
+ new ReentrantReadWriteLock());
+
+ clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue,
cacheClientNotifier);
+
+ EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
+ Conflatable conflatable = mock(Conflatable.class);
+ Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
+
+ // Pass a new event to the ClientRegistrationEventQueueManager. This event
should not be added
+ // to the test client's registration queue, because it should already be
removed. We can
+ // validate that by asserting that the client's registration queue is
empty after the add.
+ clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable,
filterClientIDs,
+ cacheClientNotifier);
+
+ assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
}
@Test
- public void addEventWithOffheapValueCopiedToHeap() {
+ public void drainThrowsExceptionQueueStillRemoved() {
+ CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
+ CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
+ ClientProxyMembershipID clientProxyMembershipID =
mock(ClientProxyMembershipID.class);
+
when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
+
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
+ ClientRegistrationEventQueueManager.ClientRegistrationEventQueue
clientRegistrationEventQueue =
+ clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+ new ConcurrentLinkedQueue<>(),
+ new ReentrantReadWriteLock());
+
+ Conflatable conflatable = mock(Conflatable.class);
+ Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
+
EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
- when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
+ RuntimeException testException = new RuntimeException();
+ when(internalCacheEvent.getRegion()).thenThrow(testException);
Operation mockOperation = mock(Operation.class);
when(mockOperation.isEntry()).thenReturn(true);
when(internalCacheEvent.getOperation()).thenReturn(mockOperation);
- Conflatable conflatable = mock(Conflatable.class);
- Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
- CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
- ClientProxyMembershipID clientProxyMembershipID =
mock(ClientProxyMembershipID.class);
+ clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable,
filterClientIDs,
+ cacheClientNotifier);
- clientRegistrationEventQueueManager.create(clientProxyMembershipID,
- new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock(), new
ReentrantLock());
+ assertThatThrownBy(() ->
clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue,
+ cacheClientNotifier))
+ .isEqualTo(testException);
- clientRegistrationEventQueueManager
- .add(internalCacheEvent, conflatable, filterClientIDs,
cacheClientNotifier);
+ // Pass a new event to the ClientRegistrationEventQueueManager. This event
should not be added
+ // to the test client's registration queue, because it should already be
removed. We can
+ // validate that by asserting that the client's registration queue is
empty after the add.
+ clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable,
filterClientIDs,
+ cacheClientNotifier);
- verify(internalCacheEvent, times(1)).copyOffHeapToHeap();
+ assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
}
- /*
- * This helps to create contention between registration threads during the
drain phase
- */
- private static int getRandomNumberOfAdds() {
- int min = 10_000;
- int max = 50_000;
- return ThreadLocalRandom.current().nextInt(min, max + 1);
+ @Test
+ public void
addEventInOriginalFilterIDsButQueueWasRemovedDueToSuccessfulRegistrationSoEventNotRedelivered()
{
+ ClientProxyMembershipID clientProxyMembershipID =
mock(ClientProxyMembershipID.class);
+ CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
+ CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
+
when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
+ Set<ClientProxyMembershipID> originalFilterIDs = new HashSet<>();
+ originalFilterIDs.add(clientProxyMembershipID);
+
+ ClientUpdateMessageImpl clientUpdateMessage =
mock(ClientUpdateMessageImpl.class);
+
+ InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
+ LocalRegion localRegion = mock(LocalRegion.class);
+ FilterProfile filterProfile = mock(FilterProfile.class);
+ FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class);
+ FilterRoutingInfo.FilterInfo filterInfo =
mock(FilterRoutingInfo.FilterInfo.class);
+
+ when(filterRoutingInfo.getLocalFilterInfo()).thenReturn(
+ filterInfo);
+ when(filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent))
+ .thenReturn(filterRoutingInfo);
+ when(localRegion.getFilterProfile()).thenReturn(filterProfile);
+ when(internalCacheEvent.getRegion()).thenReturn(localRegion);
+ when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
+
+ Set<ClientProxyMembershipID> recalculatedFilterClientIDs = new HashSet<>();
+ recalculatedFilterClientIDs.add(clientProxyMembershipID);
+ when(cacheClientNotifier.getFilterClientIDs(internalCacheEvent,
filterProfile, filterInfo,
+ clientUpdateMessage))
+ .thenReturn(recalculatedFilterClientIDs);
+ when(cacheClientProxy.getProxyID()).thenReturn(clientProxyMembershipID);
+
when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
+ ReentrantReadWriteLock mockReadWriteLock =
mock(ReentrantReadWriteLock.class);
+
+ ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
+ new ClientRegistrationEventQueueManager();
+
+ ClientRegistrationEventQueueManager.ClientRegistrationEventQueue
clientRegistrationEventQueue =
+ clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+ new ConcurrentLinkedQueue<>(),
+ mockReadWriteLock);
+
+ ReentrantReadWriteLock.ReadLock mockReadLock =
mock(ReentrantReadWriteLock.ReadLock.class);
+ when(mockReadWriteLock.readLock()).thenReturn(mockReadLock);
+ ReentrantReadWriteLock actualPutDrainLock = new ReentrantReadWriteLock();
+
when(mockReadWriteLock.writeLock()).thenReturn(actualPutDrainLock.writeLock());
+ doAnswer(i -> {
+ clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue,
cacheClientNotifier);
+ actualPutDrainLock.readLock();
+ return null;
+ }).when(mockReadLock).lock();
+
+ clientRegistrationEventQueueManager.add(internalCacheEvent,
clientUpdateMessage,
+ originalFilterIDs, cacheClientNotifier);
+
+ verify(cacheClientProxy, times(0)).deliverMessage(clientUpdateMessage);
}
}