This is an automated email from the ASF dual-hosted git repository. ladyvader pushed a commit to branch feature/giiQueueing in repository https://gitbox.apache.org/repos/asf/geode.git
commit 205ca81a6260bc9a08f277b4885a0fc3710159ca Author: ladyVader <[email protected]> AuthorDate: Thu Jul 19 11:06:05 2018 -0700 GEM-2052: updates to giiQueueing and respective test. --- .../cache/ha/HARegionQueueIntegrationTest.java | 11 ++--- .../internal/cache/ha/HARegionQueueJUnitTest.java | 42 +++++++++++++++++++ .../geode/internal/cache/ha/HARegionQueue.java | 49 +++++----------------- .../tier/sockets/ClientUpdateMessageImpl.java | 1 + 4 files changed, 59 insertions(+), 44 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java index 8a496d1..972d5f2 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java @@ -18,7 +18,6 @@ import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.spy; @@ -137,6 +136,7 @@ public class HARegionQueueIntegrationTest { new ClientProxyMembershipID(), new EventID(cache.getDistributedSystem())); HAEventWrapper wrapper = new HAEventWrapper(message); wrapper.setHAContainer(haContainerWrapper); + wrapper.incrementPutRefCount(); // Create and update HARegionQueues forcing one queue to startGiiQueueing int numQueues = 10; @@ -147,16 +147,17 @@ public class HARegionQueueIntegrationTest { assertEquals(1, haContainerWrapper.size()); HAEventWrapper wrapperInContainer = (HAEventWrapper) haContainerWrapper.getKey(wrapper); - assertEquals(numQueues, wrapperInContainer.getReferenceCount()); + assertEquals(numQueues - 1, wrapperInContainer.getReferenceCount()); + assertTrue(wrapperInContainer.getPutInProgress()); - // Verify that the HAEventWrapper in the giiQueue now has msg = null - // this gets set to null when wrapper is added to HAContainer (for non-gii queues) + // Verify that the HAEventWrapper in the giiQueue now has msg != null + // We don't null this out while putInProgress > 0 (true) Queue giiQueue = targetQueue.getGiiQueue(); assertEquals(1, giiQueue.size()); HAEventWrapper giiQueueEntry = (HAEventWrapper) giiQueue.peek(); assertNotNull(giiQueueEntry); - assertNull(giiQueueEntry.getClientUpdateMessage()); + assertNotNull(giiQueueEntry.getClientUpdateMessage()); // endGiiQueueing and verify queue empty and putEventInHARegion invoked with HAEventWrapper // not ClientUpdateMessageImpl diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java index 8ceb028..4ecc019 100755 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -1755,6 +1756,47 @@ public class HARegionQueueJUnitTest { verify(mockHAContainer, times(1)).remove(mockHAEventWrapper); } + @Test + public void testPutEntryConditionallyIntoHAContainerUpdatesInterestList() throws Exception { + final String haRegionName = testName.getMethodName(); + + HARegionQueue regionQueue = + createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE); + + ClientProxyMembershipID mockClientProxyMembershipId = mock(ClientProxyMembershipID.class); + CacheClientProxy mockCacheClientProxy = mock(CacheClientProxy.class); + + doReturn(mockClientProxyMembershipId).when(mockCacheClientProxy).getProxyID(); + ((HAContainerWrapper) regionQueue.haContainer).putProxy(haRegionName, mockCacheClientProxy); + + EventID mockEventID = mock(EventID.class); + ClientUpdateMessageImpl mockClientUpdateMessage = mock(ClientUpdateMessageImpl.class); + mockClientUpdateMessage.setEventIdentifier(mockEventID); + + doReturn(true).when(mockClientUpdateMessage) + .isClientInterestedInUpdates(mockClientProxyMembershipId); + + HAEventWrapper originalHAEventWrapper = new HAEventWrapper(mockEventID); + originalHAEventWrapper.setClientUpdateMessage(mockClientUpdateMessage); + + // allow putInProgress to be false (so we null out the msg field in the wrapper) + regionQueue.putEntryConditionallyIntoHAContainer(originalHAEventWrapper); + + // the initial haContainer.putIfAbsent() doesn't need to invoke addClientInterestList + // as it is already part of the original message + verify(mockClientUpdateMessage, times(0)).addClientInterestList(mockClientProxyMembershipId, + true); + + // create a new wrapper with the same id and message + HAEventWrapper newHAEventWrapper = new HAEventWrapper(mockEventID); + newHAEventWrapper.setClientUpdateMessage(mockClientUpdateMessage); + + regionQueue.putEntryConditionallyIntoHAContainer(newHAEventWrapper); + + // Verify that the original haContainerEntry gets the updated clientInterestList + verify(mockClientUpdateMessage, times(1)).addClientInterestList(mockClientProxyMembershipId, + true); + } /** * Wait until a given runnable stops throwing exceptions. It should take at least diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java index 3624101..69dfd6d 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java @@ -605,6 +605,8 @@ public class HARegionQueue implements RegionQueue { this.giiLock.readLock().lock(); // fix for bug #41681 - durable client misses event try { if (this.giiCount > 0) { + logger.info("{}: adding message to GII queue of size {}: {}", this.regionName, + giiQueue.size(), object); if (logger.isDebugEnabled()) { logger.debug("{}: adding message to GII queue of size {}: {}", this.regionName, giiQueue.size(), object); @@ -614,15 +616,8 @@ public class HARegionQueue implements RegionQueue { if (object instanceof HAEventWrapper) { HAEventWrapper wrapper = (HAEventWrapper) object; - boolean cumiNull = wrapper.getClientUpdateMessage() == null; - // bug #43609 - prevent loss of the message while in the queue - logger.info("RYGUY: GII Queueing - Putting conditionally into HA container. Event ID: " - + wrapper.hashCode() + "; System ID: " + System.identityHashCode(wrapper) - + "; CUMI Null: " - + cumiNull + "; ToString: " - + wrapper); + // use to putRefCount to prevent loss of the message while in the queue wrapper.incrementPutRefCount(); - putEntryConditionallyIntoHAContainer(wrapper); } this.giiQueue.add(object); @@ -771,23 +766,16 @@ public class HARegionQueue implements RegionQueue { } actualCount++; try { + logger.info("{} draining #{}: {}", this.regionName, (actualCount + 1), value); if (isDebugEnabled) { logger.debug("{} draining #{}: {}", this.regionName, (actualCount + 1), value); } if (value instanceof HAEventWrapper) { - // TODO: RYGUY: This is a bandaid and we may not need it. if (((HAEventWrapper) value).getClientUpdateMessage() == null) { - // if there is no wrapped message look for it in the HA container map - ClientUpdateMessageImpl haContainerMessage = - (ClientUpdateMessageImpl) haContainer.get(value); - if (haContainerMessage != null) { - ((HAEventWrapper) value).setClientUpdateMessage(haContainerMessage); - } else { - logger.info( - "RYGUY: {} ATTENTION: found gii queued event with null event message. Please see bug #44852: {}", - this.regionName, value); - continue; - } + logger.info( + "RYGUY: {} ATTENTION: found gii queued event with null event message. Please see bug #44852: {}", + this.regionName, value); + continue; } } basicPut(value); @@ -795,26 +783,11 @@ public class HARegionQueue implements RegionQueue { // incremented when it was queued in giiQueue. if (value instanceof HAEventWrapper) { HAEventWrapper wrapper = (HAEventWrapper) value; - wrapper.decrementPutRefCount(); - + // if putInProgress is false, the clientUpdateMessage is safely in the HAContainer if (!wrapper.getPutInProgress()) { wrapper.setClientUpdateMessage(null); } - - // TODO: RYGUY: The put ref count should cover us and we no longer need to bump/dec - // the - // HAEventWrapper ref count. If the wrapper was removed from the container, say by - // QRM, we will - // just do putIfAbsent and add it back in. The CUMI will not be null because we had - // not decremented - // the putRefCount yet when we did basicPut(). - - logger.info("RYGUY: GII Decrementing Event ID: " + wrapper.hashCode() + "; Region: " - + this.regionName + "; System identity: " - + System.identityHashCode(wrapper) + "; ToString: " + wrapper); - - decAndRemoveFromHAContainer((HAEventWrapper) value); } } catch (NoSuchElementException ignore) { break; @@ -3609,7 +3582,7 @@ public class HARegionQueue implements RegionQueue { // After the initial put to the container, the client update message is set to null. // Therefore we check if it is null and only add client CQs and interest lists if it // is not. - if (haContainerKey.getClientUpdateMessage() != null) { + if (inputHaEventWrapper.getClientUpdateMessage() != null) { addClientCQsAndInterestList(haContainerEntry, inputHaEventWrapper, this.haContainer, this.regionName); } @@ -3726,8 +3699,6 @@ public class HARegionQueue implements RegionQueue { wrapperSet.add(this.region.get(wrapperArray[i])); } - logger.info("RYGUY: Destroying HARegion in updateHAContainer()", new Exception()); - // Start a new thread which will update the clientMessagesRegion for // each of the HAEventWrapper instances present in the wrapperSet Thread regionCleanupTask = new Thread(new Runnable() { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java index bc33dd2..f0cded5 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java @@ -1213,6 +1213,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N .append(";eventId=").append(_eventIdentifier).append(";shouldConflate=") .append(_shouldConflate).append(";versionTag=").append(this.versionTag).append(";hasCqs=") .append(this._hasCqs) + .append(";clientInterestList=" + this._clientInterestList) // skip _logger :-) .append("]"); return buffer.toString();
