This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0d58250b23 GEODE-10277: For destroyed region don`t check size (#7653)
0d58250b23 is described below

commit 0d58250b2336d547d6751e7f3d27f9a8cd432d51
Author: Mario Ivanac <48509724+miva...@users.noreply.github.com>
AuthorDate: Thu May 26 07:09:04 2022 +0200

    GEODE-10277: For destroyed region don`t check size (#7653)
---
 .../wan/parallel/ParallelGatewaySenderQueue.java   |   9 ++
 ...ANPersistenceEnabledGatewaySenderDUnitTest.java | 102 +++++++++++++++++++++
 2 files changed, 111 insertions(+)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index d3361b5207..678286c92b 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1649,6 +1649,9 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
   public int localSize(boolean includeSecondary) {
     int size = 0;
     for (PartitionedRegion prQ : userRegionNameToShadowPRMap.values()) {
+      if (prQ.isDestroyed()) {
+        continue;
+      }
       if (prQ.getDataStore() != null) {
         if (includeSecondary) {
           size += prQ.getDataStore().getSizeOfLocalBuckets();
@@ -1667,6 +1670,9 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
   public int localSizeForProcessor() {
     int size = 0;
     for (PartitionedRegion prQ : userRegionNameToShadowPRMap.values()) {
+      if (prQ.isDestroyed()) {
+        continue;
+      }
       if (((PartitionedRegion) prQ.getRegion()).getDataStore() != null) {
         Set<BucketRegion> primaryBuckets =
             ((PartitionedRegion) 
prQ.getRegion()).getDataStore().getAllLocalPrimaryBucketRegions();
@@ -1689,6 +1695,9 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
   public int size() {
     int size = 0;
     for (PartitionedRegion prQ : userRegionNameToShadowPRMap.values()) {
+      if (prQ.isDestroyed()) {
+        continue;
+      }
       if (logger.isDebugEnabled()) {
         logger.debug("The name of the queue region is {} and the size is {}. 
keyset size is {}",
             prQ.getName(), prQ.size(), prQ.keys().size());
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
index fe2a02a542..42a4fd207b 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
@@ -2328,6 +2328,107 @@ public class 
ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends WANTest
 
 
 
+  /**
+   * Enable persistence for GatewaySender. Pause the sender and do some puts 
in local region. Stop
+   * GatewaySender.
+   * Then start GatewaySender with clean-queues option. Check if the remote 
site receives all the
+   * events.
+   */
+  @Test
+  public void 
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectNoException()
+      throws InterruptedException {
+    // create locator on local site
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    // create locator on remote site
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    // create cache in remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+
+    // create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5);
+
+    // create senders with disk store
+    String diskStore1 = vm4.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore2 = vm5.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+
+    // create PR on remote site
+    vm2.invoke(
+        () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 
1, 100, isOffHeap()));
+    vm3.invoke(
+        () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 
1, 100, isOffHeap()));
+
+    // create PR on local site
+    vm4.invoke(createPartitionedRegionRunnable());
+    vm5.invoke(createPartitionedRegionRunnable());
+
+    // start the senders on local site
+    startSenderInVMs("ln", vm4, vm5);
+
+    // wait for senders to become running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+
+    logger.info("All senders are running.");
+
+    // start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 300000));
+    logger.info("Completed puts in the region");
+
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    logger.info("Check that no events are propagated to remote site");
+
+    // --------------------close and rebuild local site
+    // -------------------------------------------------
+    // stop the senders
+
+    vm4.invoke(() -> WANTestBase.stopSender("ln"));
+    vm5.invoke(() -> WANTestBase.stopSender("ln"));
+
+    logger.info("Stopped all the senders.");
+
+    // wait for senders to stop
+    vm4.invoke(waitForSenderNonRunnable());
+    vm5.invoke(waitForSenderNonRunnable());
+
+    // create receiver on remote site
+    createReceiverInVMs(vm2, vm3);
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+
+    logger.info("Start all the senders.");
+
+    AsyncInvocation<Void> startSenderwithCleanQueuesInVM4 =
+        vm4.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+
+    AsyncInvocation<Void> startSenderwithCleanQueuesInVM5 =
+        vm5.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+
+    vm4.invoke(() -> validateQueueSizeStat("ln", 0));
+    vm5.invoke(() -> validateQueueSizeStat("ln", 0));
+
+    startSenderwithCleanQueuesInVM4.await();
+    startSenderwithCleanQueuesInVM5.await();
+
+    logger.info("Waiting for senders running.");
+    // wait for senders running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+
+    logger.info("All the senders are now running...");
+
+    // 
----------------------------------------------------------------------------------------------------
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+  }
+
+
   private static class BlockingDestroyRegionObserver extends 
DistributionMessageObserver {
     private final CountDownLatch startedBlocking = new CountDownLatch(1);
 
@@ -2339,6 +2440,7 @@ public class 
ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends WANTest
     }
   }
 
+
   /**
    * setIgnoreQueue has lots of callers by reflection
    * <p>

Reply via email to