This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a commit to branch feature/giiQueueing
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 205ca81a6260bc9a08f277b4885a0fc3710159ca
Author: ladyVader <[email protected]>
AuthorDate: Thu Jul 19 11:06:05 2018 -0700

    GEM-2052: updates to giiQueueing and respective test.
---
 .../cache/ha/HARegionQueueIntegrationTest.java     | 11 ++---
 .../internal/cache/ha/HARegionQueueJUnitTest.java  | 42 +++++++++++++++++++
 .../geode/internal/cache/ha/HARegionQueue.java     | 49 +++++-----------------
 .../tier/sockets/ClientUpdateMessageImpl.java      |  1 +
 4 files changed, 59 insertions(+), 44 deletions(-)

diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
index 8a496d1..972d5f2 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
@@ -18,7 +18,6 @@ import static 
org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.spy;
@@ -137,6 +136,7 @@ public class HARegionQueueIntegrationTest {
         new ClientProxyMembershipID(), new 
EventID(cache.getDistributedSystem()));
     HAEventWrapper wrapper = new HAEventWrapper(message);
     wrapper.setHAContainer(haContainerWrapper);
+    wrapper.incrementPutRefCount();
 
     // Create and update HARegionQueues forcing one queue to startGiiQueueing
     int numQueues = 10;
@@ -147,16 +147,17 @@ public class HARegionQueueIntegrationTest {
     assertEquals(1, haContainerWrapper.size());
 
     HAEventWrapper wrapperInContainer = (HAEventWrapper) 
haContainerWrapper.getKey(wrapper);
-    assertEquals(numQueues, wrapperInContainer.getReferenceCount());
+    assertEquals(numQueues - 1, wrapperInContainer.getReferenceCount());
+    assertTrue(wrapperInContainer.getPutInProgress());
 
-    // Verify that the HAEventWrapper in the giiQueue now has msg = null
-    // this gets set to null when wrapper is added to HAContainer (for non-gii 
queues)
+    // Verify that the HAEventWrapper in the giiQueue now has msg != null
+    // We don't null this out while putInProgress > 0 (true)
     Queue giiQueue = targetQueue.getGiiQueue();
     assertEquals(1, giiQueue.size());
 
     HAEventWrapper giiQueueEntry = (HAEventWrapper) giiQueue.peek();
     assertNotNull(giiQueueEntry);
-    assertNull(giiQueueEntry.getClientUpdateMessage());
+    assertNotNull(giiQueueEntry.getClientUpdateMessage());
 
     // endGiiQueueing and verify queue empty and putEventInHARegion invoked 
with HAEventWrapper
     // not ClientUpdateMessageImpl
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
index 8ceb028..4ecc019 100755
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -1755,6 +1756,47 @@ public class HARegionQueueJUnitTest {
     verify(mockHAContainer, times(1)).remove(mockHAEventWrapper);
   }
 
+  @Test
+  public void testPutEntryConditionallyIntoHAContainerUpdatesInterestList() 
throws Exception {
+    final String haRegionName = testName.getMethodName();
+
+    HARegionQueue regionQueue =
+        createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE);
+
+    ClientProxyMembershipID mockClientProxyMembershipId = 
mock(ClientProxyMembershipID.class);
+    CacheClientProxy mockCacheClientProxy = mock(CacheClientProxy.class);
+
+    
doReturn(mockClientProxyMembershipId).when(mockCacheClientProxy).getProxyID();
+    ((HAContainerWrapper) regionQueue.haContainer).putProxy(haRegionName, 
mockCacheClientProxy);
+
+    EventID mockEventID = mock(EventID.class);
+    ClientUpdateMessageImpl mockClientUpdateMessage = 
mock(ClientUpdateMessageImpl.class);
+    mockClientUpdateMessage.setEventIdentifier(mockEventID);
+
+    doReturn(true).when(mockClientUpdateMessage)
+        .isClientInterestedInUpdates(mockClientProxyMembershipId);
+
+    HAEventWrapper originalHAEventWrapper = new HAEventWrapper(mockEventID);
+    originalHAEventWrapper.setClientUpdateMessage(mockClientUpdateMessage);
+
+    // allow putInProgress to be false (so we null out the msg field in the 
wrapper)
+    regionQueue.putEntryConditionallyIntoHAContainer(originalHAEventWrapper);
+
+    // the initial haContainer.putIfAbsent() doesn't need to invoke 
addClientInterestList
+    // as it is already part of the original message
+    verify(mockClientUpdateMessage, 
times(0)).addClientInterestList(mockClientProxyMembershipId,
+        true);
+
+    // create a new wrapper with the same id and message
+    HAEventWrapper newHAEventWrapper = new HAEventWrapper(mockEventID);
+    newHAEventWrapper.setClientUpdateMessage(mockClientUpdateMessage);
+
+    regionQueue.putEntryConditionallyIntoHAContainer(newHAEventWrapper);
+
+    // Verify that the original haContainerEntry gets the updated 
clientInterestList
+    verify(mockClientUpdateMessage, 
times(1)).addClientInterestList(mockClientProxyMembershipId,
+        true);
+  }
 
   /**
    * Wait until a given runnable stops throwing exceptions. It should take at 
least
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 3624101..69dfd6d 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -605,6 +605,8 @@ public class HARegionQueue implements RegionQueue {
     this.giiLock.readLock().lock(); // fix for bug #41681 - durable client 
misses event
     try {
       if (this.giiCount > 0) {
+        logger.info("{}: adding message to GII queue of size {}: {}", 
this.regionName,
+            giiQueue.size(), object);
         if (logger.isDebugEnabled()) {
           logger.debug("{}: adding message to GII queue of size {}: {}", 
this.regionName,
               giiQueue.size(), object);
@@ -614,15 +616,8 @@ public class HARegionQueue implements RegionQueue {
         if (object instanceof HAEventWrapper) {
           HAEventWrapper wrapper = (HAEventWrapper) object;
 
-          boolean cumiNull = wrapper.getClientUpdateMessage() == null;
-          // bug #43609 - prevent loss of the message while in the queue
-          logger.info("RYGUY: GII Queueing - Putting conditionally into HA 
container. Event ID: "
-              + wrapper.hashCode() + "; System ID: " + 
System.identityHashCode(wrapper)
-              + "; CUMI Null: "
-              + cumiNull + "; ToString: "
-              + wrapper);
+          // use to putRefCount to prevent loss of the message while in the 
queue
           wrapper.incrementPutRefCount();
-          putEntryConditionallyIntoHAContainer(wrapper);
         }
 
         this.giiQueue.add(object);
@@ -771,23 +766,16 @@ public class HARegionQueue implements RegionQueue {
           }
           actualCount++;
           try {
+            logger.info("{} draining #{}: {}", this.regionName, (actualCount + 
1), value);
             if (isDebugEnabled) {
               logger.debug("{} draining #{}: {}", this.regionName, 
(actualCount + 1), value);
             }
             if (value instanceof HAEventWrapper) {
-              // TODO: RYGUY: This is a bandaid and we may not need it.
               if (((HAEventWrapper) value).getClientUpdateMessage() == null) {
-                // if there is no wrapped message look for it in the HA 
container map
-                ClientUpdateMessageImpl haContainerMessage =
-                    (ClientUpdateMessageImpl) haContainer.get(value);
-                if (haContainerMessage != null) {
-                  ((HAEventWrapper) 
value).setClientUpdateMessage(haContainerMessage);
-                } else {
-                  logger.info(
-                      "RYGUY: {} ATTENTION: found gii queued event with null 
event message.  Please see bug #44852: {}",
-                      this.regionName, value);
-                  continue;
-                }
+                logger.info(
+                    "RYGUY: {} ATTENTION: found gii queued event with null 
event message.  Please see bug #44852: {}",
+                    this.regionName, value);
+                continue;
               }
             }
             basicPut(value);
@@ -795,26 +783,11 @@ public class HARegionQueue implements RegionQueue {
             // incremented when it was queued in giiQueue.
             if (value instanceof HAEventWrapper) {
               HAEventWrapper wrapper = (HAEventWrapper) value;
-
               wrapper.decrementPutRefCount();
-
+              // if putInProgress is false, the clientUpdateMessage is safely 
in the HAContainer
               if (!wrapper.getPutInProgress()) {
                 wrapper.setClientUpdateMessage(null);
               }
-
-              // TODO: RYGUY: The put ref count should cover us and we no 
longer need to bump/dec
-              // the
-              // HAEventWrapper ref count. If the wrapper was removed from the 
container, say by
-              // QRM, we will
-              // just do putIfAbsent and add it back in. The CUMI will not be 
null because we had
-              // not decremented
-              // the putRefCount yet when we did basicPut().
-
-              logger.info("RYGUY: GII Decrementing Event ID: " + 
wrapper.hashCode() + "; Region: "
-                  + this.regionName + "; System identity: "
-                  + System.identityHashCode(wrapper) + "; ToString: " + 
wrapper);
-
-              decAndRemoveFromHAContainer((HAEventWrapper) value);
             }
           } catch (NoSuchElementException ignore) {
             break;
@@ -3609,7 +3582,7 @@ public class HARegionQueue implements RegionQueue {
             // After the initial put to the container, the client update 
message is set to null.
             // Therefore we check if it is null and only add client CQs and 
interest lists if it
             // is not.
-            if (haContainerKey.getClientUpdateMessage() != null) {
+            if (inputHaEventWrapper.getClientUpdateMessage() != null) {
               addClientCQsAndInterestList(haContainerEntry, 
inputHaEventWrapper,
                   this.haContainer, this.regionName);
             }
@@ -3726,8 +3699,6 @@ public class HARegionQueue implements RegionQueue {
           wrapperSet.add(this.region.get(wrapperArray[i]));
         }
 
-        logger.info("RYGUY: Destroying HARegion in updateHAContainer()", new 
Exception());
-
         // Start a new thread which will update the clientMessagesRegion for
         // each of the HAEventWrapper instances present in the wrapperSet
         Thread regionCleanupTask = new Thread(new Runnable() {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index bc33dd2..f0cded5 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -1213,6 +1213,7 @@ public class ClientUpdateMessageImpl implements 
ClientUpdateMessage, Sizeable, N
         
.append(";eventId=").append(_eventIdentifier).append(";shouldConflate=")
         
.append(_shouldConflate).append(";versionTag=").append(this.versionTag).append(";hasCqs=")
         .append(this._hasCqs)
+        .append(";clientInterestList=" + this._clientInterestList)
         // skip _logger :-)
         .append("]");
     return buffer.toString();

Reply via email to