This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 1cab75d GEODE-8600: Fix for faulty statistics QueueSize (#5616)
1cab75d is described below
commit 1cab75df7f95e82d113b4f81596bf2eda1e333fc
Author: Mario Ivanac <[email protected]>
AuthorDate: Tue Oct 13 11:25:22 2020 +0200
GEODE-8600: Fix for faulty statistics QueueSize (#5616)
---
.../wan/parallel/ParallelGatewaySenderQueue.java | 3 ++
.../geode/internal/cache/wan/WANTestBase.java | 9 +++++
.../wan/parallel/ParallelWANStatsDUnitTest.java | 43 ++++++++++++++++++++++
3 files changed, 55 insertions(+)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 108eff5..88ef0b1 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -278,6 +278,9 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
for (Region userRegion : listOfRegions) {
if (userRegion instanceof PartitionedRegion) {
addShadowPartitionedRegionForUserPR((PartitionedRegion) userRegion);
+ if (index == 0 && getRegion(userRegion.getFullPath()) != null) {
+
this.stats.incQueueSize(getRegion(userRegion.getFullPath()).getLocalSize());
+ }
} else {
// Fix for Bug#51491. Once decided to support this configuration we
have call
// addShadowPartitionedRegionForUserRR
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 883b313..8446885 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -1202,6 +1202,15 @@ public class WANTestBase extends DistributedTestCase {
return statistics.getSecondaryEventQueueSize();
}
+ public static void checkQueueSizeInStats(String senderId, final int
expectedQueueSize) {
+ AbstractGatewaySender sender = (AbstractGatewaySender)
cache.getGatewaySender(senderId);
+ GatewaySenderStats statistics = sender.getStatistics();
+ await()
+ .untilAsserted(() -> assertEquals("Expected queue size: " +
expectedQueueSize
+ + " but actual size: " + statistics.getEventQueueSize(),
expectedQueueSize,
+ statistics.getEventQueueSize()));
+ }
+
public static void checkConnectionStats(String senderId) {
AbstractGatewaySender sender =
(AbstractGatewaySender)
CacheFactory.getAnyInstance().getGatewaySender(senderId);
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index 7155975..7ff1c8d 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -1054,6 +1054,49 @@ public class ParallelWANStatsDUnitTest extends
WANTestBase {
verifyConflationIndexesSize(senderId, 0, vm1);
}
+
+ @Test
+ public void
testPartitionedRegionParallelPropagation_RestartSenders_NoRedundancy() {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+
+ createSenders(lnPort);
+
+ createReceiverPR(vm2, 0);
+
+ createSenderPRs(0);
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ // pause the senders
+ vm4.invoke(() -> WANTestBase.pauseSender("ln"));
+ vm5.invoke(() -> WANTestBase.pauseSender("ln"));
+ vm6.invoke(() -> WANTestBase.pauseSender("ln"));
+ vm7.invoke(() -> WANTestBase.pauseSender("ln"));
+
+ vm4.invoke(() -> WANTestBase.doPuts(testName, NUM_PUTS));
+
+ vm4.invoke(() -> WANTestBase.stopSender("ln"));
+ vm5.invoke(() -> WANTestBase.stopSender("ln"));
+ vm6.invoke(() -> WANTestBase.stopSender("ln"));
+ vm7.invoke(() -> WANTestBase.stopSender("ln"));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.checkQueueSizeInStats("ln", 0));
+ vm5.invoke(() -> WANTestBase.checkQueueSizeInStats("ln", 0));
+ vm6.invoke(() -> WANTestBase.checkQueueSizeInStats("ln", 0));
+ vm7.invoke(() -> WANTestBase.checkQueueSizeInStats("ln", 0));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
+
+ }
+
+
+
protected Map putKeyValues() {
final Map keyValues = new HashMap();
for (int i = 0; i < NUM_PUTS; i++) {