This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 0d58250b23 GEODE-10277: For destroyed region don`t check size (#7653) 0d58250b23 is described below commit 0d58250b2336d547d6751e7f3d27f9a8cd432d51 Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Thu May 26 07:09:04 2022 +0200 GEODE-10277: For destroyed region don`t check size (#7653) --- .../wan/parallel/ParallelGatewaySenderQueue.java | 9 ++ ...ANPersistenceEnabledGatewaySenderDUnitTest.java | 102 +++++++++++++++++++++ 2 files changed, 111 insertions(+) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index d3361b5207..678286c92b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -1649,6 +1649,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { public int localSize(boolean includeSecondary) { int size = 0; for (PartitionedRegion prQ : userRegionNameToShadowPRMap.values()) { + if (prQ.isDestroyed()) { + continue; + } if (prQ.getDataStore() != null) { if (includeSecondary) { size += prQ.getDataStore().getSizeOfLocalBuckets(); @@ -1667,6 +1670,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { public int localSizeForProcessor() { int size = 0; for (PartitionedRegion prQ : userRegionNameToShadowPRMap.values()) { + if (prQ.isDestroyed()) { + continue; + } if (((PartitionedRegion) prQ.getRegion()).getDataStore() != null) { Set<BucketRegion> primaryBuckets = ((PartitionedRegion) prQ.getRegion()).getDataStore().getAllLocalPrimaryBucketRegions(); @@ -1689,6 +1695,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { public int size() { int size = 0; for (PartitionedRegion prQ : userRegionNameToShadowPRMap.values()) { + if (prQ.isDestroyed()) { + continue; + } if (logger.isDebugEnabled()) { logger.debug("The name of the queue region is {} and the size is {}. keyset size is {}", prQ.getName(), prQ.size(), prQ.keys().size()); diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java index fe2a02a542..42a4fd207b 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java @@ -2328,6 +2328,107 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends WANTest + /** + * Enable persistence for GatewaySender. Pause the sender and do some puts in local region. Stop + * GatewaySender. + * Then start GatewaySender with clean-queues option. Check if the remote site receives all the + * events. + */ + @Test + public void testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectNoException() + throws InterruptedException { + // create locator on local site + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + // create locator on remote site + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + // create cache in remote site + createCacheInVMs(nyPort, vm2, vm3); + + // create cache in local site + createCacheInVMs(lnPort, vm4, vm5); + + // create senders with disk store + String diskStore1 = vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, + true, 100, 10, false, true, null, null, true)); + String diskStore2 = vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, + true, 100, 10, false, true, null, null, true)); + + // create PR on remote site + vm2.invoke( + () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 1, 100, isOffHeap())); + vm3.invoke( + () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 1, 100, isOffHeap())); + + // create PR on local site + vm4.invoke(createPartitionedRegionRunnable()); + vm5.invoke(createPartitionedRegionRunnable()); + + // start the senders on local site + startSenderInVMs("ln", vm4, vm5); + + // wait for senders to become running + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + + logger.info("All senders are running."); + + // start puts in region on local site + vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 300000)); + logger.info("Completed puts in the region"); + + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0)); + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0)); + logger.info("Check that no events are propagated to remote site"); + + // --------------------close and rebuild local site + // ------------------------------------------------- + // stop the senders + + vm4.invoke(() -> WANTestBase.stopSender("ln")); + vm5.invoke(() -> WANTestBase.stopSender("ln")); + + logger.info("Stopped all the senders."); + + // wait for senders to stop + vm4.invoke(waitForSenderNonRunnable()); + vm5.invoke(waitForSenderNonRunnable()); + + // create receiver on remote site + createReceiverInVMs(vm2, vm3); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0)); + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0)); + + logger.info("Start all the senders."); + + AsyncInvocation<Void> startSenderwithCleanQueuesInVM4 = + vm4.invokeAsync(() -> startSenderwithCleanQueues("ln")); + + AsyncInvocation<Void> startSenderwithCleanQueuesInVM5 = + vm5.invokeAsync(() -> startSenderwithCleanQueues("ln")); + + vm4.invoke(() -> validateQueueSizeStat("ln", 0)); + vm5.invoke(() -> validateQueueSizeStat("ln", 0)); + + startSenderwithCleanQueuesInVM4.await(); + startSenderwithCleanQueuesInVM5.await(); + + logger.info("Waiting for senders running."); + // wait for senders running + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + + logger.info("All the senders are now running..."); + + // ---------------------------------------------------------------------------------------------------- + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0)); + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0)); + } + + private static class BlockingDestroyRegionObserver extends DistributionMessageObserver { private final CountDownLatch startedBlocking = new CountDownLatch(1); @@ -2339,6 +2440,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends WANTest } } + /** * setIgnoreQueue has lots of callers by reflection * <p>