This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-4647 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 613aae452a8e3067bd85a77fdfce6b963b9e23e3 Author: zhouxh <gz...@pivotal.io> AuthorDate: Wed Feb 21 21:31:07 2018 -0800 GEODE-4647: add a stats eventSecondaryQueueSizeId to track events in secondary gateway sender queue --- .../asyncqueue/internal/AsyncEventQueueStats.java | 3 + .../internal/cache/AbstractBucketRegionQueue.java | 8 ++ .../geode/internal/cache/AbstractRegionMap.java | 12 ++ .../apache/geode/internal/cache/BucketAdvisor.java | 2 + .../geode/internal/cache/BucketRegionQueue.java | 2 + .../internal/cache/wan/AbstractGatewaySender.java | 1 + .../internal/cache/wan/GatewaySenderStats.java | 61 +++++++++ .../wan/parallel/ParallelGatewaySenderQueue.java | 8 +- .../wan/parallel/ParallelQueueRemovalMessage.java | 3 + .../SerialAsyncEventQueueImplJUnitTest.java | 3 + .../cache/wan/AsyncEventQueueTestBase.java | 18 ++- .../asyncqueue/AsyncEventListenerDUnitTest.java | 8 +- .../asyncqueue/AsyncEventQueueStatsDUnitTest.java | 48 +++++-- .../ParallelQueueRemovalMessageJUnitTest.java | 12 ++ .../bean/stats/AsyncEventQueueStatsJUnitTest.java | 2 - .../geode/internal/cache/wan/WANTestBase.java | 79 +++++++++-- .../parallel/ParallelWANConflationDUnitTest.java | 56 ++++++-- .../wan/parallel/ParallelWANStatsDUnitTest.java | 150 +++++++++++++++++++++ .../serial/SerialGatewaySenderQueueDUnitTest.java | 12 +- .../wan/serial/SerialWANConflationDUnitTest.java | 73 +++++++++- .../wan/serial/SerialWANPropagationDUnitTest.java | 1 + 21 files changed, 517 insertions(+), 45 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java index 2f3029a..8d68cee 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java @@ -42,6 +42,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats { f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing events.", "nanoseconds"), f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false), + f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of the secondary event queue.", + "operations", false), f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events queue.", "operations", false), f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED, @@ -108,6 +110,7 @@ public class AsyncEventQueueStats extends GatewaySenderStats { eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED); eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME); eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE); + eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE); eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE); eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED); eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java index af62f74..2406b18 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java @@ -220,6 +220,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { this.gatewaySenderStats.decQueueSize(size); } + public void decSecondaryQueueSize(int size) { + this.gatewaySenderStats.decSecondaryQueueSize(size); + } + public void decQueueSize() { this.gatewaySenderStats.decQueueSize(); } @@ -228,6 +232,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { this.gatewaySenderStats.incQueueSize(size); } + public void incSecondaryQueueSize(int size) { + this.gatewaySenderStats.incSecondaryQueueSize(size); + } + public void incQueueSize() { this.gatewaySenderStats.incQueueSize(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java index 2c57182..0b2e68c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java @@ -635,6 +635,10 @@ public abstract class AbstractRegionMap tombstones.put(tag, newRe); } else { _getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe)); + if (_getOwner() instanceof BucketRegionQueue) { + BucketRegionQueue brq = (BucketRegionQueue) _getOwner(); + brq.incSecondaryQueueSize(1); + } } incEntryCount(1); lruEntryUpdate(newRe); @@ -660,6 +664,10 @@ public abstract class AbstractRegionMap } else { _getOwner().updateSizeOnCreate(re.getKey(), _getOwner().calculateRegionEntryValueSize(re)); + if (_getOwner() instanceof BucketRegionQueue) { + BucketRegionQueue brq = (BucketRegionQueue) _getOwner(); + brq.incSecondaryQueueSize(1); + } } } incEntryCount(size()); @@ -1043,6 +1051,10 @@ public abstract class AbstractRegionMap } finally { if (done && result) { initialImagePutEntry(newRe); + if (owner instanceof BucketRegionQueue) { + BucketRegionQueue brq = (BucketRegionQueue) owner; + brq.addToEventQueue(key, done, event); + } } if (!done) { removeEntry(key, newRe, false); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java index 4708fd9..af386f5 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java @@ -314,6 +314,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor { if (b instanceof BucketRegionQueue) { BucketRegionQueue brq = (BucketRegionQueue) b; brq.decQueueSize(brq.size()); + brq.incSecondaryQueueSize(brq.size()); } } } @@ -1189,6 +1190,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor { if (br instanceof BucketRegionQueue) { // Shouldn't it be AbstractBucketRegionQueue BucketRegionQueue brq = (BucketRegionQueue) br; brq.incQueueSize(brq.size()); + brq.decSecondaryQueueSize(brq.size()); } if (br != null && br instanceof BucketRegion) { ((BucketRegion) br).afterAcquiringPrimaryState(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index 07c32a1..afc1544 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -438,6 +438,8 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { } if (this.getBucketAdvisor().isPrimary()) { incQueueSize(1); + } else { + incSecondaryQueueSize(1); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 0fef764..59547b2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -1098,6 +1098,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi } statistics.setQueueSize(0); + statistics.setSecondaryQueueSize(0); statistics.setTempQueueSize(0); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java index 2b93082..15ff18e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java @@ -46,6 +46,8 @@ public class GatewaySenderStats { protected static final String EVENT_QUEUE_TIME = "eventQueueTime"; /** Name of the event queue size statistic */ protected static final String EVENT_QUEUE_SIZE = "eventQueueSize"; + /** Name of the event secondary queue size statistic */ + protected static final String EVENT_SECONDARY_QUEUE_SIZE = "eventSecondaryQueueSize"; /** Name of the event temporary queue size statistic */ protected static final String TMP_EVENT_QUEUE_SIZE = "tempQueueSize"; /** Name of the events distributed statistic */ @@ -104,6 +106,8 @@ public class GatewaySenderStats { protected static int eventQueueTimeId; /** Id of the event queue size statistic */ protected static int eventQueueSizeId; + /** Id of the event in secondary queue size statistic */ + protected static int eventSecondaryQueueSizeId; /** Id of the temp event queue size statistic */ protected static int eventTmpQueueSizeId; /** Id of the events distributed statistic */ @@ -168,6 +172,8 @@ public class GatewaySenderStats { f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing events.", "nanoseconds"), f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false), + f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of secondary event queue.", + "operations", false), f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events.", "operations", false), f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED, @@ -238,6 +244,7 @@ public class GatewaySenderStats { eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED); eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME); eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE); + eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE); eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE); eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED); eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD); @@ -358,6 +365,15 @@ public class GatewaySenderStats { } /** + * Returns the current value of the "eventSecondaryQueueSize" stat. + * + * @return the current value of the "eventSecondaryQueueSize" stat + */ + public int getEventSecondaryQueueSize() { + return this.stats.getInt(eventSecondaryQueueSizeId); + } + + /** * Returns the current value of the "tempQueueSize" stat. * * @return the current value of the "tempQueueSize" stat. @@ -462,6 +478,15 @@ public class GatewaySenderStats { } /** + * Sets the "eventSecondaryQueueSize" stat. + * + * @param size The size of the secondary queue + */ + public void setSecondaryQueueSize(int size) { + this.stats.setInt(eventSecondaryQueueSizeId, size); + } + + /** * Sets the "tempQueueSize" stat. * * @param size The size of the temp queue @@ -479,6 +504,14 @@ public class GatewaySenderStats { } /** + * Increments the "eventSecondaryQueueSize" stat by 1. + */ + public void incSecondaryQueueSize() { + this.stats.incInt(eventSecondaryQueueSizeId, 1); + assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0; + } + + /** * Increments the "tempQueueSize" stat by 1. */ public void incTempQueueSize() { @@ -495,6 +528,16 @@ public class GatewaySenderStats { } /** + * Increments the "eventSecondaryQueueSize" stat by given delta. + * + * @param delta an integer by which secondary queue size to be increased + */ + public void incSecondaryQueueSize(int delta) { + this.stats.incInt(eventSecondaryQueueSizeId, delta); + assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0; + } + + /** * Increments the "tempQueueSize" stat by given delta. * * @param delta an integer by which temp queue size to be increased @@ -511,6 +554,14 @@ public class GatewaySenderStats { } /** + * Decrements the "eventSecondaryQueueSize" stat by 1. + */ + public void decSecondaryQueueSize() { + this.stats.incInt(eventSecondaryQueueSizeId, -1); + assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0; + } + + /** * Decrements the "tempQueueSize" stat by 1. */ public void decTempQueueSize() { @@ -527,6 +578,16 @@ public class GatewaySenderStats { } /** + * Decrements the "eventSecondaryQueueSize" stat by given delta. + * + * @param delta an integer by which secondary queue size to be increased + */ + public void decSecondaryQueueSize(int delta) { + this.stats.incInt(eventSecondaryQueueSizeId, -delta); + assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0; + } + + /** * Decrements the "tempQueueSize" stat by given delta. * * @param delta an integer by which temp queue size to be increased diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index cdb33ab..28c437f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -1416,10 +1416,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { return (BucketRegionQueue) prQ.getDataStore().getLocalBucketById(bucketId); } - public int localSize() { - return localSize(false); - } - public String displayContent() { int size = 0; StringBuffer sb = new StringBuffer(); @@ -1436,6 +1432,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue { return sb.toString(); } + public int localSize() { + return localSize(false); + } + public int localSize(boolean includeSecondary) { int size = 0; for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java index 39fedbf..df89e36 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java @@ -183,6 +183,9 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage { final boolean isDebugEnabled = logger.isDebugEnabled(); try { brq.destroyKey(key); + if (!brq.getBucketAdvisor().isPrimary()) { + prQ.getParallelGatewaySender().getStatistics().decSecondaryQueueSize(); + } if (isDebugEnabled) { logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", key, prQ.getName(), brq.getId()); diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java index eb8ad01..4c5caa2 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java @@ -50,14 +50,17 @@ public class SerialAsyncEventQueueImplJUnitTest { attrs.id = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id"; SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache, attrs); queue.getStatistics().incQueueSize(5); + queue.getStatistics().incSecondaryQueueSize(6); queue.getStatistics().incTempQueueSize(10); assertEquals(5, queue.getStatistics().getEventQueueSize()); + assertEquals(6, queue.getStatistics().getEventSecondaryQueueSize()); assertEquals(10, queue.getStatistics().getTempEventQueueSize()); queue.stop(); assertEquals(0, queue.getStatistics().getEventQueueSize()); + assertEquals(0, queue.getStatistics().getEventSecondaryQueueSize()); assertEquals(0, queue.getStatistics().getTempEventQueueSize()); } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java index 8366ca7..2074e9e 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java @@ -718,19 +718,33 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase { } public static void checkAsyncEventQueueStats(String queueId, final int queueSize, - final int eventsReceived, final int eventsQueued, final int eventsDistributed) { + int secondaryQueueSize, final int eventsReceived, final int eventsQueued, + final int eventsDistributed) { Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues(); AsyncEventQueue queue = null; + boolean isParallel = false; for (AsyncEventQueue q : asyncQueues) { + isParallel = q.isParallel(); if (q.getId().equals(queueId)) { queue = q; break; } } final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl) queue).getStatistics(); - Awaitility.await().atMost(60, TimeUnit.SECONDS) + Awaitility.await().atMost(120, TimeUnit.SECONDS) .until(() -> assertEquals("Expected queue entries: " + queueSize + " but actual entries: " + statistics.getEventQueueSize(), queueSize, statistics.getEventQueueSize())); + if (isParallel) { + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> { + assertEquals( + "Expected events in the secondary queue is " + secondaryQueueSize + ", but actual is " + + statistics.getEventSecondaryQueueSize(), + secondaryQueueSize, statistics.getEventSecondaryQueueSize()); + }); + } else { + // for serial queue, evenvSecondaryQueueSize is not used + assertEquals(0, statistics.getEventSecondaryQueueSize()); + } assertEquals(queueSize, statistics.getEventQueueSize()); assertEquals(eventsReceived, statistics.getEventsReceived()); assertEquals(eventsQueued, statistics.getEventsQueued()); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java index aa1db53..442ef9f 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java @@ -1521,8 +1521,8 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { LogWriterUtils.getLogWriter().info("Primary buckets on vm2: " + primaryBucketsvm2); // before shutdown vm2, both vm1 and vm2 should have 40 events in primary queue - vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 40, 80, 80, 0)); - vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 40, 80, 80, 0)); + vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 40, 40, 80, 80, 0)); + vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 40, 40, 80, 80, 0)); // ---------------------------- Kill vm2 -------------------------- vm2.invoke(() -> AsyncEventQueueTestBase.killSender()); @@ -1549,8 +1549,8 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { .info("After shutdown vm2, started vm3, Primary buckets on vm1: " + primaryBucketsvm1); // vm1.invoke(()->AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 80, 80, 80, 0)); - vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 40, 80, 80, 0)); - vm3.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 40, 0, 0, 0)); + vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 40, 40, 80, 80, 0)); + vm3.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 40, 40, 0, 0, 0)); vm3.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln")); vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln")); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java index 935a650..3ea44eb 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java @@ -27,6 +27,8 @@ import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.Wait; import org.apache.geode.test.junit.categories.AEQTest; import org.apache.geode.test.junit.categories.DistributedTest; +import org.apache.geode.test.junit.categories.FlakyTest; +import org.apache.geode.test.junit.categories.WanTest; @Category({DistributedTest.class, AEQTest.class}) public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase { @@ -74,15 +76,17 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase { // sender Wait.pause(2000);// give some time for system to become stable - vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1000, 1000, 1000)); + vm1.invoke( + () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1000, 1000, 1000)); vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln", 10)); - vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1000, 0, 0)); + vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1000, 0, 0)); vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln", 0)); } /** * Two listeners added to the same RR. */ + @Category({FlakyTest.class, WanTest.class}) // GEODE-4647 @Test public void testAsyncStatsTwoListeners() throws Exception { Integer lnPort = createFirstLocatorWithDSId(1); @@ -119,19 +123,43 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase { vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln1,ln2", isOffHeap())); + vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1")); + vm2.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1")); + vm3.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1")); + vm4.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1")); + vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2")); + vm2.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2")); + vm3.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2")); + vm4.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2")); + vm1.invoke(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + "_RR", 1000)); + vm1.invoke( + () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 1000, 0, 1000, 1000, 0)); + vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 1000, 0, 1000, 0, 0)); + + vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1")); + vm2.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1")); + vm3.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1")); + vm4.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1")); + vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2")); + vm2.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2")); + vm3.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2")); + vm4.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2")); + vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln1", 1000)); vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln2", 1000)); Wait.pause(2000);// give some time for system to become stable - vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 1000, 1000, 1000)); + vm1.invoke( + () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 0, 1000, 1000, 1000)); vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln1", 10)); - vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 1000, 1000, 1000)); + vm1.invoke( + () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 0, 1000, 1000, 1000)); vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln2", 10)); - vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 1000, 0, 0)); + vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 0, 1000, 0, 0)); vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln1", 0)); - vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 1000, 0, 0)); + vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 0, 1000, 0, 0)); vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln2", 0)); } @@ -229,11 +257,12 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase { vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 1500)); Wait.pause(2000);// give some time for system to become stable - vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1500, 1500, 1500)); + vm1.invoke( + () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1500, 1500, 1500)); vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueUnprocessedStats("ln", 0)); - vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1500, 0, 0)); + vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1500, 0, 0)); vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueUnprocessedStats("ln", 1500)); } @@ -301,7 +330,8 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase { vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 1000)); Wait.pause(2000);// give some time for system to become stable - vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 2000, 2000, 1000)); + vm1.invoke( + () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 2000, 2000, 1000)); vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueConflatedStats("ln", 500)); } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java index 5e0f704..d1ea59f 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java @@ -65,6 +65,8 @@ import org.apache.geode.internal.cache.eviction.EvictionController; import org.apache.geode.internal.cache.partitioned.RegionAdvisor; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; +import org.apache.geode.internal.cache.wan.GatewaySenderStats; +import org.apache.geode.internal.statistics.DummyStatisticsFactory; import org.apache.geode.test.fake.Fakes; import org.apache.geode.test.junit.categories.UnitTest; @@ -81,6 +83,7 @@ public class ParallelQueueRemovalMessageJUnitTest { private PartitionedRegion rootRegion; private BucketRegionQueue bucketRegionQueue; private BucketRegionQueueHelper bucketRegionQueueHelper; + private GatewaySenderStats stats; @Before public void setUpGemFire() { @@ -116,6 +119,8 @@ public class ParallelQueueRemovalMessageJUnitTest { when(this.queueRegion.getParallelGatewaySender()).thenReturn(this.sender); when(this.sender.getQueues()).thenReturn(null); when(this.sender.getDispatcherThreads()).thenReturn(1); + stats = new GatewaySenderStats(new DummyStatisticsFactory(), "ln"); + when(this.sender.getStatistics()).thenReturn(stats); } private void createRootRegion() { @@ -183,6 +188,7 @@ public class ParallelQueueRemovalMessageJUnitTest { // Validate initial BucketRegionQueue state assertFalse(this.bucketRegionQueue.isInitialized()); assertEquals(0, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size()); + stats.setSecondaryQueueSize(1); // Create and process a ParallelQueueRemovalMessage (causes the failedBatchRemovalMessageKeys to // add a key) @@ -190,6 +196,8 @@ public class ParallelQueueRemovalMessageJUnitTest { // Validate BucketRegionQueue after processing ParallelQueueRemovalMessage assertEquals(1, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size()); + // failed BatchRemovalMessage will not modify stats + assertEquals(1, stats.getEventSecondaryQueueSize()); } @Test @@ -201,6 +209,7 @@ public class ParallelQueueRemovalMessageJUnitTest { // Add an event to the BucketRegionQueue and verify BucketRegionQueue state this.bucketRegionQueueHelper.addEvent(KEY); assertEquals(1, this.bucketRegionQueue.size()); + assertEquals(1, stats.getEventSecondaryQueueSize()); // Create and process a ParallelQueueRemovalMessage (causes the value of the entry to be set to // DESTROYED) @@ -210,6 +219,7 @@ public class ParallelQueueRemovalMessageJUnitTest { // Clean up destroyed tokens and validate BucketRegionQueue this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete(); assertEquals(0, this.bucketRegionQueue.size()); + assertEquals(0, stats.getEventSecondaryQueueSize()); } @Test @@ -247,6 +257,7 @@ public class ParallelQueueRemovalMessageJUnitTest { // Add an event to the BucketRegionQueue and verify BucketRegionQueue state GatewaySenderEventImpl event = this.bucketRegionQueueHelper.addEvent(KEY); assertEquals(1, this.bucketRegionQueue.size()); + assertEquals(1, stats.getEventSecondaryQueueSize()); // Add a mock GatewaySenderEventImpl to the temp queue BlockingQueue<GatewaySenderEventImpl> tempQueue = createTempQueueAndAddEvent(processor, event); @@ -259,6 +270,7 @@ public class ParallelQueueRemovalMessageJUnitTest { // Validate temp queue is empty after processing ParallelQueueRemovalMessage assertEquals(0, tempQueue.size()); + assertEquals(0, stats.getEventSecondaryQueueSize()); // Clean up destroyed tokens this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete(); diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java index 48ca857..7c485be 100644 --- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java @@ -49,8 +49,6 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase { sample(); assertEquals(0, getEventQueueSize()); - - } private int getEventQueueSize() { diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java index 30b7972..f989405 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -124,6 +124,7 @@ import org.apache.geode.distributed.Locator; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.InternalLocator; import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.AvailablePort; import org.apache.geode.internal.AvailablePortHelper; import org.apache.geode.internal.admin.remote.DistributionLocatorId; @@ -142,6 +143,8 @@ import org.apache.geode.internal.cache.execute.data.Order; import org.apache.geode.internal.cache.execute.data.OrderId; import org.apache.geode.internal.cache.execute.data.Shipment; import org.apache.geode.internal.cache.execute.data.ShipmentId; +import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage; +import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse; import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException; import org.apache.geode.internal.cache.tier.sockets.CacheServerStats; import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil; @@ -932,9 +935,9 @@ public class WANTestBase extends DistributedTestCase { props.setProperty(JMX_MANAGER_HTTP_PORT, "0"); } props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + locPort + "]"); String logLevel = System.getProperty(LOG_LEVEL, "info"); props.setProperty(LOG_LEVEL, logLevel); + props.setProperty(LOCATORS, "localhost[" + locPort + "]"); InternalDistributedSystem ds = test.getSystem(props); cache = CacheFactory.create(ds); } @@ -1131,12 +1134,57 @@ public class WANTestBase extends DistributedTestCase { return connectionInfo; } + public static void moveAllPrimaryBuckets(String senderId, final DistributedMember destination, + final String regionName) { + + AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId); + final RegionQueue regionQueue; + regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0]; + if (sender.isParallel()) { + ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue = + (ConcurrentParallelGatewaySenderQueue) regionQueue; + PartitionedRegion prQ = + parallelGatewaySenderQueue.getRegions().toArray(new PartitionedRegion[1])[0]; + + Set<Integer> primaryBucketIds = prQ.getDataStore().getAllLocalPrimaryBucketIds(); + for (int bid : primaryBucketIds) { + movePrimary(destination, regionName, bid); + } + + // double check after moved all primary buckets + primaryBucketIds = prQ.getDataStore().getAllLocalPrimaryBucketIds(); + assertTrue(primaryBucketIds.isEmpty()); + } + } + + public static void movePrimary(final DistributedMember destination, final String regionName, + final int bucketId) { + PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName); + + BecomePrimaryBucketResponse response = BecomePrimaryBucketMessage + .send((InternalDistributedMember) destination, region, bucketId, true); + assertNotNull(response); + assertTrue(response.waitForResponse()); + } + + public static int getSecondaryQueueSizeInStats(String senderId) { + AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId); + GatewaySenderStats statistics = sender.getStatistics(); + return statistics.getEventSecondaryQueueSize(); + } + public static List<Integer> getSenderStats(String senderId, final int expectedQueueSize) { AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId); GatewaySenderStats statistics = sender.getStatistics(); if (expectedQueueSize != -1) { final RegionQueue regionQueue; regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0]; + if (sender.isParallel()) { + ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue = + (ConcurrentParallelGatewaySenderQueue) regionQueue; + PartitionedRegion pr = + parallelGatewaySenderQueue.getRegions().toArray(new PartitionedRegion[1])[0]; + } Awaitility.await().atMost(120, TimeUnit.SECONDS) .until(() -> assertEquals("Expected queue entries: " + expectedQueueSize + " but actual entries: " + regionQueue.size(), expectedQueueSize, @@ -1153,9 +1201,28 @@ public class WANTestBase extends DistributedTestCase { stats.add(statistics.getEventsNotQueuedConflated()); stats.add(statistics.getEventsConflatedFromBatches()); stats.add(statistics.getConflationIndexesMapSize()); + stats.add(statistics.getEventSecondaryQueueSize()); return stats; } + protected static int getTotalBucketQueueSize(PartitionedRegion prQ, boolean isPrimary) { + int size = 0; + if (prQ != null) { + Set<Map.Entry<Integer, BucketRegion>> allBuckets = prQ.getDataStore().getAllLocalBuckets(); + List<Integer> thisProcessorBuckets = new ArrayList<Integer>(); + + for (Map.Entry<Integer, BucketRegion> bucketEntry : allBuckets) { + BucketRegion bucket = bucketEntry.getValue(); + int bId = bucket.getId(); + if ((isPrimary && bucket.getBucketAdvisor().isPrimary()) + || (!isPrimary && !bucket.getBucketAdvisor().isPrimary())) { + size += bucket.size(); + } + } + } + return size; + } + public static List<Integer> getSenderStatsForDroppedEvents(String senderId) { AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId); GatewaySenderStats statistics = sender.getStatistics(); @@ -3117,9 +3184,7 @@ public class WANTestBase extends DistributedTestCase { } if (!sender.isParallel()) { - if (includeSecondary) { - fail("Not implemented yet"); - } + // if sender is serial, the queues will be all primary or all secondary at one member final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues(); int size = 0; for (RegionQueue q : queues) { @@ -3134,11 +3199,7 @@ public class WANTestBase extends DistributedTestCase { } else if (regionQueue instanceof ParallelGatewaySenderQueue) { return ((ParallelGatewaySenderQueue) regionQueue).localSize(includeSecondary); } else { - if (includeSecondary) { - fail("Not Implemented yet"); - } - regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0]; - return regionQueue.getRegion().size(); + fail("Not implemented yet"); } } fail("Not yet implemented?"); diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java index 3281df6..9a85871 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java @@ -62,6 +62,11 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { vm4.invoke(() -> checkQueueSize("ln", (keyValues.size() + updateKeyValues.size()))); + vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), updateKeyValues)); + + // Since no conflation, all updates are in queue + vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + 2 * updateKeyValues.size())); + vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0)); resumeSenders(); @@ -86,7 +91,7 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { vm6.invoke(() -> createSender("ln", 2, true, 100, 50, false, false, null, true)); vm7.invoke(() -> createSender("ln", 2, true, 100, 50, false, false, null, true)); - createSenderPRs(); + createSenderPRs(1); startSenderInVMs("ln", vm4, vm5, vm6, vm7); @@ -103,24 +108,35 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), keyValues)); } + // sender did not turn on conflation, so queue size will be 100 (otherwise it will be 20) + vm4.invoke(() -> checkQueueSize("ln", 100)); vm4.invoke(() -> enableConflation("ln")); vm5.invoke(() -> enableConflation("ln")); vm6.invoke(() -> enableConflation("ln")); vm7.invoke(() -> enableConflation("ln")); - resumeSenders(); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 100)); ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 100)); ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 100)); ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 100)); + assertTrue("Event in secondary queue should be 100", + (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 100); + + resumeSenders(); + + v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); assertTrue("No events conflated in batch", (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0); + assertEquals("Event in secondary queue should be 0 after dispatched", 0, + (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10))); vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10)); @@ -155,12 +171,14 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + updateKeyValues.size())); // creates // aren't // conflated + validateEventSecondaryQueueSize(keyValues.size() + updateKeyValues.size(), redundancy); vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), updateKeyValues)); - vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + updateKeyValues.size())); // creates - // aren't - // conflated + int expectedEventNumAfterConflation = keyValues.size() + updateKeyValues.size(); + vm4.invoke(() -> checkQueueSize("ln", expectedEventNumAfterConflation)); + + validateEventSecondaryQueueSize(expectedEventNumAfterConflation, redundancy); vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0)); @@ -168,6 +186,24 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { keyValues.putAll(updateKeyValues); validateReceiverRegionSize(keyValues); + + // after dispatch, both primary and secondary queues are empty + vm4.invoke(() -> checkQueueSize("ln", 0)); + validateEventSecondaryQueueSize(0, redundancy); + } + + private void validateEventSecondaryQueueSize(int expectedNum, int redundancy) { + ArrayList<Integer> v4List = + (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); + ArrayList<Integer> v5List = + (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); + ArrayList<Integer> v6List = + (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); + ArrayList<Integer> v7List = + (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); + assertTrue("Event in secondary queue should be 100", + (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == expectedNum + * redundancy); } @Test diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java index 9d9ed0f..1edf2a2 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java @@ -53,6 +53,156 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { } @Test + public void testQueueSizeInSecondaryBucketRegionQueuesWithMemberRestart() throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2); + createReceiverInVMs(vm2); + + createSendersWithConflation(lnPort); + + createSenderPRs(1); + + startPausedSenders(); + + createReceiverPR(vm2, 1); + putKeyValues(); + + ArrayList<Integer> v4List = + (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + ArrayList<Integer> v5List = + (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + ArrayList<Integer> v6List = + (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + ArrayList<Integer> v7List = + (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + + assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue + // size + assertEquals(NUM_PUTS * 2, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived + assertEquals(NUM_PUTS * 2, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); // events + // queued + assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events + // distributed + assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary + // queue + // size + + // stop vm7 to trigger rebalance and move some primary buckets + System.out.println("Current secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10) + + ":" + v6List.get(10) + ":" + v7List.get(10)); + vm7.invoke(() -> WANTestBase.closeCache()); + Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> { + int v4secondarySize = vm4.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln")); + int v5secondarySize = vm5.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln")); + int v6secondarySize = vm6.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln")); + assertEquals(NUM_PUTS, v4secondarySize + v5secondarySize + v6secondarySize); // secondary + // queue + // size + }); + System.out.println("New secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10) + ":" + + v6List.get(10)); + + vm7.invoke(() -> WANTestBase.createCache(lnPort)); + vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true)); + vm7.invoke(() -> WANTestBase.createPartitionedRegion(testName, "ln", 1, 10, isOffHeap())); + startSenderInVMs("ln", vm7); + vm7.invoke(() -> pauseSender("ln")); + + v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary + // queue + // size + System.out.println("After restart vm7, secondary queue sizes:" + v4List.get(10) + ":" + + v5List.get(10) + ":" + v6List.get(10) + ":" + v7List.get(10)); + + vm4.invoke(() -> WANTestBase.resumeSender("ln")); + vm5.invoke(() -> WANTestBase.resumeSender("ln")); + vm6.invoke(() -> WANTestBase.resumeSender("ln")); + vm7.invoke(() -> WANTestBase.resumeSender("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS)); + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, NUM_PUTS, NUM_PUTS)); + + vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0)); + + v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events + // distributed + assertEquals(0, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary + // queue + // size + } + + // TODO: add a test without redudency for primary switch + @Test + public void testQueueSizeInSecondaryWithPrimarySwitch() throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2); + createReceiverInVMs(vm2); + + createSendersWithConflation(lnPort); + + createSenderPRs(1); + + startPausedSenders(); + + createReceiverPR(vm2, 1); + + putKeyValues(); + + ArrayList<Integer> v4List = + (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + ArrayList<Integer> v5List = + (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + ArrayList<Integer> v6List = + (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + ArrayList<Integer> v7List = + (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + + assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue + // size + assertEquals(NUM_PUTS * 2, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived + assertEquals(NUM_PUTS * 2, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); // events + // queued + assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events + // distributed + assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary + // queue + // size + + vm4.invoke(() -> WANTestBase.resumeSender("ln")); + vm5.invoke(() -> WANTestBase.resumeSender("ln")); + vm6.invoke(() -> WANTestBase.resumeSender("ln")); + vm7.invoke(() -> WANTestBase.resumeSender("ln")); + vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS)); + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, NUM_PUTS, NUM_PUTS)); + + vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0)); + + v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events + // distributed + assertEquals(0, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary + // queue + // size + } + + @Test public void testPartitionedRegionParallelPropagation_BeforeDispatch() throws Exception { Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java index 7e3248e..1226fde 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java @@ -18,6 +18,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.*; import static org.junit.Assert.*; import java.io.File; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Properties; @@ -101,7 +102,13 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.pauseSender("ln")); vm6.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); - Wait.pause(5000); + ArrayList<Integer> v4List = + (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 1000)); + ArrayList<Integer> v5List = + (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 1000)); + // secondary queue size stats in serial queue should be 0 + assertEquals(0, v4List.get(10) + v5List.get(10)); + HashMap primarySenderUpdates = (HashMap) vm4.invoke(() -> WANTestBase.checkQueue()); HashMap secondarySenderUpdates = (HashMap) vm5.invoke(() -> WANTestBase.checkQueue()); assertEquals(primarySenderUpdates, secondarySenderUpdates); @@ -136,6 +143,9 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { // removing all the keys. secondarySenderUpdates = (HashMap) vm5.invoke(() -> WANTestBase.checkQueue()); assertEquals(secondarySenderUpdates.get("Destroy"), receiverUpdates.get("Create")); + + vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); } protected void checkPrimarySenderUpdatesOnVM5(HashMap primarySenderUpdates) { diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java index 5b14fcc..b66fef2 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java @@ -41,10 +41,10 @@ public class SerialWANConflationDUnitTest extends WANTestBase { createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap())); - vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap())); - vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap())); - vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap())); + vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap())); + vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap())); + vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap())); + vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap())); vm4.invoke(() -> createSender("ln", 2, false, 100, 50, false, false, null, true)); vm5.invoke(() -> createSender("ln", 2, false, 100, 50, false, false, null, true)); @@ -89,6 +89,71 @@ public class SerialWANConflationDUnitTest extends WANTestBase { assertTrue("No events conflated in batch", (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0); + } + + @Test + public void testSerialPropagationPartitionRegionConflationDuringEnqueue() throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer) vm1.invoke(() -> createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2, vm3); + + vm2.invoke(() -> createPartitionedRegion(getTestMethodName(), null, 1, 8, isOffHeap())); + vm3.invoke(() -> createPartitionedRegion(getTestMethodName(), null, 1, 8, isOffHeap())); + + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap())); + vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap())); + vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap())); + vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap())); + + vm4.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true)); + vm5.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true)); + vm6.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true)); + vm7.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true)); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> pauseSender("ln")); + vm5.invoke(() -> pauseSender("ln")); + vm6.invoke(() -> pauseSender("ln")); + vm7.invoke(() -> pauseSender("ln")); + + + final Map keyValues = new HashMap(); + + for (int i = 1; i <= 10; i++) { + for (int j = 1; j <= 10; j++) { + keyValues.put(j, i); + } + vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), keyValues)); + } + + ArrayList<Integer> v4List = + (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 20)); + assertTrue("After conflation during enqueue, there should be only 20 events", + v4List.get(0) == 20); + + vm4.invoke(() -> resumeSender("ln")); + vm5.invoke(() -> resumeSender("ln")); + vm6.invoke(() -> resumeSender("ln")); + vm7.invoke(() -> resumeSender("ln")); + + v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + ArrayList<Integer> v5List = + (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + ArrayList<Integer> v6List = + (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + ArrayList<Integer> v7List = + (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + assertTrue("No events in secondary queue stats since it's serial sender", + (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 0); + assertTrue("Total queued events should be 100", + (v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)) == 100); vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10)); diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java index e84fd89..87c90e0 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java @@ -163,6 +163,7 @@ public class SerialWANPropagationDUnitTest extends WANTestBase { IgnoredException.addIgnoredException(BatchException70.class.getName()); IgnoredException.addIgnoredException(ServerOperationException.class.getName()); IgnoredException.addIgnoredException(IOException.class.getName()); + IgnoredException.addIgnoredException(java.net.SocketException.class.getName()); vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 10000)); -- To stop receiving notification emails like this one, please contact zho...@apache.org.