This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 0d58250b23 GEODE-10277: For destroyed region don`t check size (#7653)
0d58250b23 is described below
commit 0d58250b2336d547d6751e7f3d27f9a8cd432d51
Author: Mario Ivanac <[email protected]>
AuthorDate: Thu May 26 07:09:04 2022 +0200
GEODE-10277: For destroyed region don`t check size (#7653)
---
.../wan/parallel/ParallelGatewaySenderQueue.java | 9 ++
...ANPersistenceEnabledGatewaySenderDUnitTest.java | 102 +++++++++++++++++++++
2 files changed, 111 insertions(+)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index d3361b5207..678286c92b 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1649,6 +1649,9 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
public int localSize(boolean includeSecondary) {
int size = 0;
for (PartitionedRegion prQ : userRegionNameToShadowPRMap.values()) {
+ if (prQ.isDestroyed()) {
+ continue;
+ }
if (prQ.getDataStore() != null) {
if (includeSecondary) {
size += prQ.getDataStore().getSizeOfLocalBuckets();
@@ -1667,6 +1670,9 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
public int localSizeForProcessor() {
int size = 0;
for (PartitionedRegion prQ : userRegionNameToShadowPRMap.values()) {
+ if (prQ.isDestroyed()) {
+ continue;
+ }
if (((PartitionedRegion) prQ.getRegion()).getDataStore() != null) {
Set<BucketRegion> primaryBuckets =
((PartitionedRegion)
prQ.getRegion()).getDataStore().getAllLocalPrimaryBucketRegions();
@@ -1689,6 +1695,9 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
public int size() {
int size = 0;
for (PartitionedRegion prQ : userRegionNameToShadowPRMap.values()) {
+ if (prQ.isDestroyed()) {
+ continue;
+ }
if (logger.isDebugEnabled()) {
logger.debug("The name of the queue region is {} and the size is {}.
keyset size is {}",
prQ.getName(), prQ.size(), prQ.keys().size());
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
index fe2a02a542..42a4fd207b 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
@@ -2328,6 +2328,107 @@ public class
ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends WANTest
+ /**
+ * Enable persistence for GatewaySender. Pause the sender and do some puts
in local region. Stop
+ * GatewaySender.
+ * Then start GatewaySender with clean-queues option. Check if the remote
site receives all the
+ * events.
+ */
+ @Test
+ public void
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectNoException()
+ throws InterruptedException {
+ // create locator on local site
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ // create locator on remote site
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
+
+ // create cache in remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ // create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5);
+
+ // create senders with disk store
+ String diskStore1 = vm4.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore2 = vm5.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+
+ // create PR on remote site
+ vm2.invoke(
+ () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null,
1, 100, isOffHeap()));
+ vm3.invoke(
+ () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null,
1, 100, isOffHeap()));
+
+ // create PR on local site
+ vm4.invoke(createPartitionedRegionRunnable());
+ vm5.invoke(createPartitionedRegionRunnable());
+
+ // start the senders on local site
+ startSenderInVMs("ln", vm4, vm5);
+
+ // wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+
+ logger.info("All senders are running.");
+
+ // start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 300000));
+ logger.info("Completed puts in the region");
+
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ logger.info("Check that no events are propagated to remote site");
+
+ // --------------------close and rebuild local site
+ // -------------------------------------------------
+ // stop the senders
+
+ vm4.invoke(() -> WANTestBase.stopSender("ln"));
+ vm5.invoke(() -> WANTestBase.stopSender("ln"));
+
+ logger.info("Stopped all the senders.");
+
+ // wait for senders to stop
+ vm4.invoke(waitForSenderNonRunnable());
+ vm5.invoke(waitForSenderNonRunnable());
+
+ // create receiver on remote site
+ createReceiverInVMs(vm2, vm3);
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+
+ logger.info("Start all the senders.");
+
+ AsyncInvocation<Void> startSenderwithCleanQueuesInVM4 =
+ vm4.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+
+ AsyncInvocation<Void> startSenderwithCleanQueuesInVM5 =
+ vm5.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+
+ vm4.invoke(() -> validateQueueSizeStat("ln", 0));
+ vm5.invoke(() -> validateQueueSizeStat("ln", 0));
+
+ startSenderwithCleanQueuesInVM4.await();
+ startSenderwithCleanQueuesInVM5.await();
+
+ logger.info("Waiting for senders running.");
+ // wait for senders running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+
+ logger.info("All the senders are now running...");
+
+ //
----------------------------------------------------------------------------------------------------
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ }
+
+
private static class BlockingDestroyRegionObserver extends
DistributionMessageObserver {
private final CountDownLatch startedBlocking = new CountDownLatch(1);
@@ -2339,6 +2440,7 @@ public class
ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends WANTest
}
}
+
/**
* setIgnoreQueue has lots of callers by reflection
* <p>