This is an automated email from the ASF dual-hosted git repository.

jjramos pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.12 by this push:
     new f5f9a66  GEODE-7940: Fix Tracking of ShadowBuckets Destroyed (#4934)
f5f9a66 is described below

commit f5f9a6604aca81d377a2854b6c0473c27bb72e7d
Author: Juan José Ramos <[email protected]>
AuthorDate: Fri Apr 17 09:32:22 2020 +0100

    GEODE-7940: Fix Tracking of ShadowBuckets Destroyed (#4934)
    
    The BucketAdvisor can now keep track of more than just one shadow
    bucket to avoid incorrectly marking all of them as destroyed.
    
    - Added unit and distributed tests.
    
    (cherry picked from commit bfbb398891c5d96fa3a5975365b29d71bd849ad6)
---
 .../apache/geode/internal/cache/BucketAdvisor.java |  20 +++-
 .../internal/cache/PartitionedRegionDataStore.java |  10 +-
 .../wan/parallel/ParallelGatewaySenderQueue.java   |  99 +++++++++---------
 .../geode/internal/cache/BucketAdvisorTest.java    |  80 +++++++++++++-
 .../ParallelGatewaySenderOperationsDUnitTest.java  | 116 +++++++++++++++++++++
 5 files changed, 261 insertions(+), 64 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index 13e5a37..ac1fb4d 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -33,6 +33,8 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -164,7 +166,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor 
{
 
   private PartitionedRegion pRegion;
 
-  private volatile boolean shadowBucketDestroyed;
+  final ConcurrentMap<String, Boolean> destroyedShadowBuckets = new 
ConcurrentHashMap<>();
 
   /**
    * Constructs a new BucketAdvisor for the Bucket owned by RegionAdvisor.
@@ -2742,11 +2744,19 @@ public class BucketAdvisor extends 
CacheDistributionAdvisor {
     }
   }
 
-  void setShadowBucketDestroyed(boolean destroyed) {
-    shadowBucketDestroyed = destroyed;
+  void markAllShadowBucketsAsNonDestroyed() {
+    destroyedShadowBuckets.clear();
   }
 
-  public boolean getShadowBucketDestroyed() {
-    return shadowBucketDestroyed;
+  void markAllShadowBucketsAsDestroyed() {
+    destroyedShadowBuckets.forEach((k, v) -> destroyedShadowBuckets.put(k, 
true));
+  }
+
+  void markShadowBucketAsDestroyed(String shadowBucketPath) {
+    destroyedShadowBuckets.put(shadowBucketPath, true);
+  }
+
+  public boolean isShadowBucketDestroyed(String shadowBucketPath) {
+    return destroyedShadowBuckets.getOrDefault(shadowBucketPath, false);
   }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index 49a7c2a..077dc72 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -450,11 +450,11 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
               try {
                 buk.initializePrimaryElector(creationRequestor);
                 if (getPartitionedRegion().getColocatedWith() == null) {
-                  buk.getBucketAdvisor().setShadowBucketDestroyed(false);
+                  buk.getBucketAdvisor().markAllShadowBucketsAsNonDestroyed();
                 }
                 if (getPartitionedRegion().isShadowPR()) {
                   
getPartitionedRegion().getColocatedWithRegion().getRegionAdvisor()
-                      
.getBucketAdvisor(possiblyFreeBucketId).setShadowBucketDestroyed(false);
+                      
.getBucketAdvisor(possiblyFreeBucketId).markAllShadowBucketsAsNonDestroyed();
                 }
                 bukReg = createBucketRegion(possiblyFreeBucketId);
                 // Mark the bucket as hosting and distribute to peers
@@ -471,7 +471,7 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
                   bukReg.invokePartitionListenerAfterBucketCreated();
                 } else {
                   if (buk.getPartitionedRegion().getColocatedWith() == null) {
-                    buk.getBucketAdvisor().setShadowBucketDestroyed(true);
+                    buk.getBucketAdvisor().markAllShadowBucketsAsDestroyed();
                     // clear tempQueue for all the shadowPR buckets
                     clearAllTempQueueForShadowPR(buk.getBucketId());
                   }
@@ -1422,7 +1422,7 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
                     && buk.getPartitionedRegion().isShadowPR()) {
                   if (buk.getPartitionedRegion().getColocatedWithRegion() != 
null) {
                     
buk.getPartitionedRegion().getColocatedWithRegion().getRegionAdvisor()
-                        
.getBucketAdvisor(bucketId).setShadowBucketDestroyed(true);
+                        
.getBucketAdvisor(bucketId).markShadowBucketAsDestroyed(buk.getFullPath());
                   }
                 }
               } catch (RegionDestroyedException ignore) {
@@ -1592,7 +1592,7 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
         if (bucketRegion.getPartitionedRegion().isShadowPR()) {
           if (bucketRegion.getPartitionedRegion().getColocatedWithRegion() != 
null) {
             
bucketRegion.getPartitionedRegion().getColocatedWithRegion().getRegionAdvisor()
-                .getBucketAdvisor(bucketId).setShadowBucketDestroyed(true);
+                .getBucketAdvisor(bucketId).markAllShadowBucketsAsDestroyed();
           }
         }
         bucketAdvisor.getProxyBucketRegion().removeBucket();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index a4a9a28..baac4d4 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -94,21 +94,18 @@ import 
org.apache.geode.management.internal.beans.AsyncEventQueueMBean;
 import org.apache.geode.management.internal.beans.GatewaySenderMBean;
 
 public class ParallelGatewaySenderQueue implements RegionQueue {
-
   protected static final Logger logger = LogService.getLogger();
-
-  protected final Map<String, PartitionedRegion> userRegionNameToshadowPRMap =
-      new ConcurrentHashMap<String, PartitionedRegion>();
+  protected final Map<String, PartitionedRegion> userRegionNameToShadowPRMap =
+      new ConcurrentHashMap<>();
+  private static final String SHADOW_BUCKET_PATH_PREFIX =
+      Region.SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME + 
Region.SEPARATOR;
 
   // <PartitionedRegion, Map<Integer, List<Object>>>
   private final Map regionToDispatchedKeysMap = new ConcurrentHashMap();
 
   protected final StoppableReentrantLock buckToDispatchLock;
-
   private final StoppableCondition regionToDispatchedKeysMapEmpty;
-
   protected final StoppableReentrantLock queueEmptyLock;
-
   private volatile boolean isQueueEmpty = true;
 
   /**
@@ -197,7 +194,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
       try {
         String regionPath =
             ColocationHelper.getLeaderRegion((PartitionedRegion) 
event.getRegion()).getFullPath();
-        prQ = userRegionNameToshadowPRMap.get(regionPath);
+        prQ = userRegionNameToShadowPRMap.get(regionPath);
         destroyEventFromQueue(prQ, bucketId, previousTailKeyTobeRemoved);
       } catch (EntryNotFoundException e) {
         if (logger.isDebugEnabled()) {
@@ -313,7 +310,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     try {
       String regionName = userRegion.getFullPath();
 
-      if (this.userRegionNameToshadowPRMap.containsKey(regionName))
+      if (this.userRegionNameToShadowPRMap.containsKey(regionName))
         return;
 
       InternalCache cache = sender.getCache();
@@ -419,7 +416,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
       }
     } finally {
       if (prQ != null) {
-        this.userRegionNameToshadowPRMap.put(userRegion.getFullPath(), prQ);
+        this.userRegionNameToShadowPRMap.put(userRegion.getFullPath(), prQ);
       }
       this.sender.getLifeCycleLock().writeLock().unlock();
     }
@@ -447,13 +444,13 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
         // to leader PR)
         // though, internally, colocate the GatewaySender's shadowPR with the 
leader PR in
         // colocation chain
-        if (!this.userRegionNameToshadowPRMap.containsKey(leaderRegionName)) {
+        if (!this.userRegionNameToShadowPRMap.containsKey(leaderRegionName)) {
           
addShadowPartitionedRegionForUserPR(ColocationHelper.getLeaderRegion(userPR));
         }
         return;
       }
 
-      if (this.userRegionNameToshadowPRMap.containsKey(regionName))
+      if (this.userRegionNameToShadowPRMap.containsKey(regionName))
         return;
 
       if (userPR.getDataPolicy().withPersistence() && 
!sender.isPersistenceEnabled()) {
@@ -552,7 +549,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
 
     } finally {
       if (prQ != null) {
-        this.userRegionNameToshadowPRMap.put(userPR.getFullPath(), prQ);
+        this.userRegionNameToShadowPRMap.put(userPR.getFullPath(), prQ);
       }
       /*
        * Here, enqueueTempEvents need to be invoked when a sender is already 
running and userPR is
@@ -676,9 +673,9 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     if (isDebugEnabled) {
       logger.debug("Put is for the region {}", region);
     }
-    if (!this.userRegionNameToshadowPRMap.containsKey(regionPath)) {
+    if (!this.userRegionNameToShadowPRMap.containsKey(regionPath)) {
       if (isDebugEnabled) {
-        logger.debug("The userRegionNameToshadowPRMap is {}", 
userRegionNameToshadowPRMap);
+        logger.debug("The userRegionNameToshadowPRMap is {}", 
userRegionNameToShadowPRMap);
       }
       logger.warn(
           "GatewaySender: Not queuing the event {}, as the region for which 
this event originated is not yet configured in the GatewaySender",
@@ -687,7 +684,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
       return false;
     }
 
-    PartitionedRegion prQ = this.userRegionNameToshadowPRMap.get(regionPath);
+    PartitionedRegion prQ = this.userRegionNameToShadowPRMap.get(regionPath);
     int bucketId = value.getBucketId();
     Object key = null;
     if (!isDREvent) {
@@ -716,16 +713,14 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
 
     try {
       if (brq == null) {
+        // Full path of the bucket
+        final String bucketFullPath = SHADOW_BUCKET_PATH_PREFIX + 
prQ.getBucketName(bucketId);
+
         // Set the threadInitLevel to BEFORE_INITIAL_IMAGE.
         final InitializationLevel oldLevel =
             LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE);
-        try {
-          // Full path of the bucket:
-
-          final String bucketFullPath =
-              Region.SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME + 
Region.SEPARATOR
-                  + prQ.getBucketName(bucketId);
 
+        try {
           brq = (AbstractBucketRegionQueue) 
prQ.getCache().getInternalRegionByPath(bucketFullPath);
           if (isDebugEnabled) {
             logger.debug(
@@ -752,8 +747,8 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
             // chain is getting destroyed one by one starting from child region
             // i.e this bucket due to moveBucket operation
             // In that case we don't want to store this event.
-            if (((PartitionedRegion) 
prQ.getColocatedWithRegion()).getRegionAdvisor()
-                .getBucketAdvisor(bucketId).getShadowBucketDestroyed()) {
+            if 
(prQ.getColocatedWithRegion().getRegionAdvisor().getBucketAdvisor(bucketId)
+                .isShadowBucketDestroyed(bucketFullPath)) {
               if (isDebugEnabled) {
                 logger.debug(
                     "ParallelGatewaySenderOrderedQueue not putting key {} : 
Value : {} as shadowPR bucket is destroyed.",
@@ -803,17 +798,16 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
           LocalRegion.setThreadInitLevelRequirement(oldLevel);
         }
       } else {
-        boolean thisbucketDestroyed = false;
+        boolean thisBucketDestroyed = brq.isDestroyed();
 
         if (!isDREvent) {
-          thisbucketDestroyed =
-              ((PartitionedRegion) 
prQ.getColocatedWithRegion()).getRegionAdvisor()
-                  .getBucketAdvisor(bucketId).getShadowBucketDestroyed() || 
brq.isDestroyed();
-        } else {
-          thisbucketDestroyed = brq.isDestroyed();
+          // Full path of the bucket
+          final String bucketFullPath = SHADOW_BUCKET_PATH_PREFIX + 
prQ.getBucketName(bucketId);
+          thisBucketDestroyed |= 
prQ.getColocatedWithRegion().getRegionAdvisor()
+              
.getBucketAdvisor(bucketId).isShadowBucketDestroyed(bucketFullPath);
         }
 
-        if (!thisbucketDestroyed) {
+        if (!thisBucketDestroyed) {
           putIntoBucketRegionQueue(brq, key, value);
           putDone = true;
         } else {
@@ -828,6 +822,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     } finally {
       notifyEventProcessorIfRequired();
     }
+
     return putDone;
   }
 
@@ -887,19 +882,19 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
    */
   @Override
   public Region getRegion() {
-    return this.userRegionNameToshadowPRMap.size() == 1
-        ? (Region) this.userRegionNameToshadowPRMap.values().toArray()[0] : 
null;
+    return this.userRegionNameToShadowPRMap.size() == 1
+        ? (Region) this.userRegionNameToShadowPRMap.values().toArray()[0] : 
null;
   }
 
   public PartitionedRegion getRegion(String fullpath) {
-    return this.userRegionNameToshadowPRMap.get(fullpath);
+    return this.userRegionNameToShadowPRMap.get(fullpath);
   }
 
   public PartitionedRegion removeShadowPR(String fullpath) {
     try {
       this.sender.getLifeCycleLock().writeLock().lock();
       this.sender.setEnqueuedAllTempQueueEvents(false);
-      return this.userRegionNameToshadowPRMap.remove(fullpath);
+      return this.userRegionNameToShadowPRMap.remove(fullpath);
     } finally {
       sender.getLifeCycleLock().writeLock().unlock();
     }
@@ -913,15 +908,15 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
    * Returns the set of shadowPR backign this queue.
    */
   public Set<PartitionedRegion> getRegions() {
-    return new HashSet(this.userRegionNameToshadowPRMap.values());
+    return new HashSet(this.userRegionNameToShadowPRMap.values());
   }
 
   // TODO: Find optimal way to get Random shadow pr as this will be called in 
each put and peek.
   protected PartitionedRegion getRandomShadowPR() {
     PartitionedRegion prQ = null;
-    if (this.userRegionNameToshadowPRMap.values().size() > 0) {
-      int randomIndex = new 
Random().nextInt(this.userRegionNameToshadowPRMap.size());
-      prQ = (PartitionedRegion) 
this.userRegionNameToshadowPRMap.values().toArray()[randomIndex];
+    if (this.userRegionNameToShadowPRMap.values().size() > 0) {
+      int randomIndex = new 
Random().nextInt(this.userRegionNameToShadowPRMap.size());
+      prQ = (PartitionedRegion) 
this.userRegionNameToShadowPRMap.values().toArray()[randomIndex];
     }
     return prQ;
   }
@@ -943,7 +938,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
 
   protected boolean areLocalBucketQueueRegionsPresent() {
     boolean bucketsAvailable = false;
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+    for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
       if (prQ.getDataStore().getAllLocalBucketRegions().size() > 0)
         return true;
     }
@@ -1007,11 +1002,11 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
         Object key = null;
         if (event.getRegion() != null) {
           if (isDREvent(sender.getCache(), event)) {
-            prQ = 
this.userRegionNameToshadowPRMap.get(event.getRegion().getFullPath());
+            prQ = 
this.userRegionNameToShadowPRMap.get(event.getRegion().getFullPath());
             bucketId = event.getEventId().getBucketID();
             key = event.getEventId();
           } else {
-            prQ = this.userRegionNameToshadowPRMap.get(ColocationHelper
+            prQ = this.userRegionNameToShadowPRMap.get(ColocationHelper
                 .getLeaderRegion((PartitionedRegion) 
event.getRegion()).getFullPath());
             bucketId = event.getBucketId();
             key = event.getShadowKey();
@@ -1023,11 +1018,11 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
           if (region != null && !region.isDestroyed()) {
             // TODO: We have to get colocated parent region for this region
             if (region instanceof DistributedRegion) {
-              prQ = this.userRegionNameToshadowPRMap.get(region.getFullPath());
+              prQ = this.userRegionNameToShadowPRMap.get(region.getFullPath());
               event.getBucketId();
               key = event.getEventId();
             } else {
-              prQ = this.userRegionNameToshadowPRMap
+              prQ = this.userRegionNameToShadowPRMap
                   .get(ColocationHelper.getLeaderRegion((PartitionedRegion) 
region).getFullPath());
               event.getBucketId();
               key = event.getShadowKey();
@@ -1442,7 +1437,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
   public String displayContent() {
     int size = 0;
     StringBuffer sb = new StringBuffer();
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+    for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
       if (prQ != null && prQ.getDataStore() != null) {
         Set<BucketRegion> allLocalBuckets = 
prQ.getDataStore().getAllLocalBucketRegions();
         for (BucketRegion br : allLocalBuckets) {
@@ -1461,7 +1456,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
 
   public int localSize(boolean includeSecondary) {
     int size = 0;
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+    for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
       if (prQ != null && prQ.getDataStore() != null) {
         if (includeSecondary) {
           size += prQ.getDataStore().getSizeOfLocalBuckets();
@@ -1479,7 +1474,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
 
   public int localSizeForProcessor() {
     int size = 0;
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+    for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
       if (((PartitionedRegion) prQ.getRegion()).getDataStore() != null) {
         Set<BucketRegion> primaryBuckets =
             ((PartitionedRegion) 
prQ.getRegion()).getDataStore().getAllLocalPrimaryBucketRegions();
@@ -1500,7 +1495,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
   @Override
   public int size() {
     int size = 0;
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+    for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
       if (logger.isDebugEnabled()) {
         logger.debug("The name of the queue region is {} and the size is {}. 
keyset size is {}",
             prQ.getName(), prQ.size(), prQ.keys().size());
@@ -1513,7 +1508,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
 
   @Override
   public void addCacheListener(CacheListener listener) {
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+    for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
       AttributesMutator mutator = prQ.getAttributesMutator();
       mutator.addCacheListener(listener);
     }
@@ -1539,7 +1534,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
 
   public long getNumEntriesOverflowOnDiskTestOnly() {
     long numEntriesOnDisk = 0;
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+    for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
       DiskRegionStats diskStats = prQ.getDiskRegionStats();
       if (diskStats == null) {
         if (logger.isDebugEnabled()) {
@@ -1561,7 +1556,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
 
   public long getNumEntriesInVMTestOnly() {
     long numEntriesInVM = 0;
-    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+    for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
       DiskRegionStats diskStats = prQ.getDiskRegionStats();
       if (diskStats == null) {
         if (logger.isDebugEnabled()) {
@@ -1848,7 +1843,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
 
   public long estimateMemoryFootprint(SingleObjectSizer sizer) {
     return sizer.sizeof(this) + sizer.sizeof(regionToDispatchedKeysMap)
-        + sizer.sizeof(userRegionNameToshadowPRMap) + 
sizer.sizeof(bucketToTempQueueMap)
+        + sizer.sizeof(userRegionNameToShadowPRMap) + 
sizer.sizeof(bucketToTempQueueMap)
         + sizer.sizeof(peekedEvents) + sizer.sizeof(conflationExecutor);
   }
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
index 2eac9ac..d49430b 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
@@ -14,8 +14,10 @@
  */
 package org.apache.geode.internal.cache;
 
+import static com.google.common.collect.ImmutableMap.of;
 import static 
org.apache.geode.internal.cache.CacheServerImpl.CACHE_SERVER_BIND_ADDRESS_NOT_AVAILABLE_EXCEPTION_MESSAGE;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doCallRealMethod;
@@ -28,6 +30,7 @@ import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -59,6 +62,7 @@ public class BucketAdvisorTest {
     ProxyBucketRegion mockBucket = mock(ProxyBucketRegion.class);
     RegionAdvisor mockRegionAdvisor = mock(RegionAdvisor.class);
     PartitionedRegion mockPartitionedRegion = mock(PartitionedRegion.class);
+    @SuppressWarnings("rawtypes")
     PartitionAttributes mockPartitionAttributes = 
mock(PartitionAttributes.class);
     DistributionManager mockDistributionManager = 
mock(DistributionManager.class);
     List<CacheServer> cacheServers = new ArrayList<>();
@@ -80,12 +84,13 @@ public class BucketAdvisorTest {
     assertThat(bucketAdvisor.getBucketServerLocations(0).size()).isEqualTo(0);
   }
 
-  @Test(expected = IllegalStateException.class)
+  @Test
   public void 
whenServerThrowsIllegalStateExceptionWithoutBindAddressMsgThenExceptionMustBeThrown()
 {
     InternalCache mockCache = mock(InternalCache.class);
     ProxyBucketRegion mockBucket = mock(ProxyBucketRegion.class);
     RegionAdvisor mockRegionAdvisor = mock(RegionAdvisor.class);
     PartitionedRegion mockPartitionedRegion = mock(PartitionedRegion.class);
+    @SuppressWarnings("rawtypes")
     PartitionAttributes mockPartitionAttributes = 
mock(PartitionAttributes.class);
     DistributionManager mockDistributionManager = 
mock(DistributionManager.class);
     List<CacheServer> cacheServers = new ArrayList<>();
@@ -103,7 +108,8 @@ public class BucketAdvisorTest {
     when(mockCacheServer.getExternalAddress()).thenThrow(new 
IllegalStateException());
 
     BucketAdvisor bucketAdvisor = 
BucketAdvisor.createBucketAdvisor(mockBucket, mockRegionAdvisor);
-    bucketAdvisor.getBucketServerLocations(0).size();
+    assertThatThrownBy(() -> bucketAdvisor.getBucketServerLocations(0))
+        .isInstanceOf(IllegalStateException.class);
   }
 
   @Test
@@ -150,4 +156,74 @@ public class BucketAdvisorTest {
     advisorSpy.volunteerForPrimary();
     verify(volunteeringDelegate).volunteerForPrimary();
   }
+
+  BucketAdvisor mockBucketAdvisorWithShadowBucketsDestroyedMap(Map<String, 
Boolean> shadowBuckets) {
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    when(distributionManager.getId()).thenReturn(new 
InternalDistributedMember("localhost", 321));
+
+    Bucket bucket = mock(Bucket.class);
+    when(bucket.isHosting()).thenReturn(true);
+    when(bucket.isPrimary()).thenReturn(false);
+    when(bucket.getDistributionManager()).thenReturn(distributionManager);
+
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    when(partitionedRegion.getRedundantCopies()).thenReturn(0);
+    when(partitionedRegion.getPartitionAttributes()).thenReturn(new 
PartitionAttributesImpl());
+    RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+    when(regionAdvisor.getPartitionedRegion()).thenReturn(partitionedRegion);
+
+    BucketAdvisor bucketAdvisor = BucketAdvisor.createBucketAdvisor(bucket, 
regionAdvisor);
+    bucketAdvisor.destroyedShadowBuckets.putAll(shadowBuckets);
+
+    return bucketAdvisor;
+  }
+
+  @Test
+  public void 
markAllShadowBucketsAsNonDestroyedShouldClearTheShadowBucketsDestroyedMap() {
+    Map<String, Boolean> buckets = of("/b1", false, "/b2", true);
+    BucketAdvisor bucketAdvisor = 
mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
+
+    assertThat(bucketAdvisor.destroyedShadowBuckets).isNotEmpty();
+    bucketAdvisor.markAllShadowBucketsAsNonDestroyed();
+    assertThat(bucketAdvisor.destroyedShadowBuckets).isEmpty();
+  }
+
+  @Test
+  public void 
markAllShadowBucketsAsDestroyedShouldSetTheFlagAsTrueForEveryKnownShadowBucket()
 {
+    Map<String, Boolean> buckets = of("/b1", false, "/b2", false, "/b3", 
false);
+    BucketAdvisor bucketAdvisor = 
mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
+
+    bucketAdvisor.destroyedShadowBuckets.forEach((k, v) -> 
assertThat(v).isFalse());
+    bucketAdvisor.markAllShadowBucketsAsDestroyed();
+    bucketAdvisor.destroyedShadowBuckets.forEach((k, v) -> 
assertThat(v).isTrue());
+  }
+
+  @Test
+  public void 
markShadowBucketAsDestroyedShouldSetTheFlagAsTrueOnlyForTheSpecificBucket() {
+    Map<String, Boolean> buckets = of("/b1", false);
+    BucketAdvisor bucketAdvisor = 
mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
+
+    // Known Shadow Bucket
+    assertThat(bucketAdvisor.destroyedShadowBuckets.get("/b1")).isFalse();
+    bucketAdvisor.markShadowBucketAsDestroyed("/b1");
+    assertThat(bucketAdvisor.destroyedShadowBuckets.get("/b1")).isTrue();
+
+    // Unknown Shadow Bucket
+    assertThat(bucketAdvisor.destroyedShadowBuckets.get("/b5")).isNull();
+    bucketAdvisor.markShadowBucketAsDestroyed("/b5");
+    assertThat(bucketAdvisor.destroyedShadowBuckets.get("/b5")).isTrue();
+  }
+
+  @Test
+  public void isShadowBucketDestroyedShouldReturnCorrectly() {
+    Map<String, Boolean> buckets = of("/b1", true, "/b2", false);
+    BucketAdvisor bucketAdvisor = 
mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
+
+    // Known Shadow Buckets
+    assertThat(bucketAdvisor.isShadowBucketDestroyed("/b1")).isTrue();
+    assertThat(bucketAdvisor.isShadowBucketDestroyed("/b2")).isFalse();
+
+    // Unknown Shadow Bucket
+    assertThat(bucketAdvisor.isShadowBucketDestroyed("/b5")).isFalse();
+  }
 }
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index 1d0ad7b..ff5f0f5 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -15,7 +15,9 @@
 package org.apache.geode.internal.cache.wan.parallel;
 
 import static 
org.apache.geode.distributed.internal.DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME;
+import static 
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPortsForDUnitSite;
 import static 
org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_SIZE_PROPERTY;
+import static org.apache.geode.internal.util.ArrayUtils.asList;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -25,14 +27,21 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import org.junit.Before;
 import org.junit.Rule;
@@ -63,6 +72,7 @@ import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
 import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
 
 /**
  * DUnit test for operations on ParallelGatewaySender
@@ -72,6 +82,9 @@ import org.apache.geode.test.junit.categories.WanTest;
 public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
 
   @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Rule
   public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
 
   @Rule
@@ -643,6 +656,109 @@ public class ParallelGatewaySenderOperationsDUnitTest 
extends WANTestBase {
   }
 
   @Test
+  public void 
destroyParallelGatewaySenderShouldNotStopDispatchingFromOtherSendersAttachedToTheRegion()
 {
+    String site2SenderId = "site2-sender";
+    String site3SenderId = "site3-sender";
+    String regionName = testName.getMethodName();
+    int[] ports = getRandomAvailableTCPPortsForDUnitSite(3);
+    int site1Port = ports[0];
+    int site2Port = ports[1];
+    int site3Port = ports[2];
+    Set<String> site1RemoteLocators =
+        Stream.of("localhost[" + site2Port + "]", "localhost[" + site3Port + 
"]")
+            .collect(Collectors.toSet());
+    Set<String> site2RemoteLocators =
+        Stream.of("localhost[" + site1Port + "]", "localhost[" + site3Port + 
"]")
+            .collect(Collectors.toSet());
+    Set<String> site3RemoteLocators =
+        Stream.of("localhost[" + site1Port + "]", "localhost[" + site2Port + 
"]")
+            .collect(Collectors.toSet());
+
+    // Start 3 sites.
+    vm0.invoke(() -> createLocator(1, site1Port,
+        Collections.singleton("localhost[" + site1Port + "]"), 
site1RemoteLocators));
+    vm1.invoke(() -> createLocator(2, site2Port,
+        Collections.singleton("localhost[" + site2Port + "]"), 
site2RemoteLocators));
+    vm2.invoke(() -> createLocator(3, site3Port,
+        Collections.singleton("localhost[" + site3Port + "]"), 
site3RemoteLocators));
+
+    // Create the cache on the 3 sites.
+    createCacheInVMs(site1Port, vm3);
+    createCacheInVMs(site2Port, vm4);
+    createCacheInVMs(site3Port, vm5);
+
+    // Create receiver and region on sites 2 and 3.
+    asList(vm4, vm5).forEach(vm -> vm.invoke(() -> {
+      createReceiver();
+      createPartitionedRegion(regionName, null, 1, 113, isOffHeap());
+    }));
+
+    // Create senders and partitioned region on site 1.
+    vm3.invoke(() -> {
+      createSender(site2SenderId, 2, true, 100, 20, false, false, null, false);
+      createSender(site3SenderId, 3, true, 100, 20, false, false, null, false);
+      waitForSenderRunningState(site2SenderId);
+      waitForSenderRunningState(site3SenderId);
+
+      createPartitionedRegion(regionName, String.join(",", site2SenderId, 
site3SenderId), 1, 113,
+          isOffHeap());
+    });
+
+    // 
####################################################################################
 //
+
+    final int FIRST_BATCH = 100;
+    final int SECOND_BATCH = 200;
+    final Map<String, String> firstBatch = new HashMap<>();
+    IntStream.range(0, FIRST_BATCH).forEach(i -> firstBatch.put("Key" + i, 
"Value" + i));
+    final Map<String, String> secondBatch = new HashMap<>();
+    IntStream.range(FIRST_BATCH, SECOND_BATCH)
+        .forEach(i -> secondBatch.put("Key" + i, "Value" + i));
+
+    // Insert first batch and wait until the queues are empty.
+    vm3.invoke(() -> {
+      cache.getRegion(regionName).putAll(firstBatch);
+      checkQueueSize(site2SenderId, 0);
+      checkQueueSize(site3SenderId, 0);
+    });
+
+    // Wait until sites 2 and 3 have received all updates.
+    asList(vm4, vm5).forEach(vm -> vm.invoke(() -> {
+      Region<String, String> region = cache.getRegion(regionName);
+      await().untilAsserted(() -> 
assertThat(region.size()).isEqualTo(FIRST_BATCH));
+      firstBatch.forEach((key, value) -> 
assertThat(region.get(key)).isEqualTo(value));
+    }));
+
+    // Stop sender to site3, remove it from the region and destroy it.
+    vm3.invoke(() -> {
+      stopSender(site3SenderId);
+      removeSenderFromTheRegion(site3SenderId, regionName);
+      destroySender(site3SenderId);
+      verifySenderDestroyed(site3SenderId, true);
+    });
+
+    // Insert second batch and wait until the queue is empty.
+    vm3.invoke(() -> {
+      cache.getRegion(regionName).putAll(secondBatch);
+      checkQueueSize(site2SenderId, 0);
+    });
+
+    // Site 3 should only have the first batch.
+    vm5.invoke(() -> {
+      Region<String, String> region = cache.getRegion(regionName);
+      await().untilAsserted(() -> 
assertThat(region.size()).isEqualTo(FIRST_BATCH));
+      firstBatch.forEach((key, value) -> 
assertThat(region.get(key)).isEqualTo(value));
+    });
+
+    // Site 2 should have both batches.
+    vm4.invoke(() -> {
+      Region<String, String> region = cache.getRegion(regionName);
+      await().untilAsserted(() -> 
assertThat(region.size()).isEqualTo(SECOND_BATCH));
+      firstBatch.forEach((key, value) -> 
assertThat(region.get(key)).isEqualTo(value));
+      secondBatch.forEach((key, value) -> 
assertThat(region.get(key)).isEqualTo(value));
+    });
+  }
+
+  @Test
   public void testParallelGatewaySenderMessageTooLargeException() {
     vm4.invoke(() -> System.setProperty(MAX_MESSAGE_SIZE_PROPERTY, 
String.valueOf(1024 * 1024)));
 

Reply via email to