This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new ab243c8 GEODE-4942: when sender is starting, and not running yet, put event at primary should be saved for QueueRemovalMessage (#1771) ab243c8 is described below commit ab243c890f47f67a55cff344f3f41946512eee24 Author: Xiaojian Zhou <gesterz...@users.noreply.github.com> AuthorDate: Tue Apr 10 19:18:18 2018 -0700 GEODE-4942: when sender is starting, and not running yet, put event at primary should be saved for QueueRemovalMessage (#1771) --- .../asyncqueue/internal/AsyncEventQueueStats.java | 6 +- .../geode/internal/cache/EntryEventImpl.java | 3 + .../internal/cache/PartitionedRegionDataStore.java | 6 +- .../internal/cache/wan/AbstractGatewaySender.java | 15 ++++- .../wan/AbstractGatewaySenderEventProcessor.java | 50 +++++++++++++- .../internal/cache/wan/GatewaySenderStats.java | 16 +++++ .../ConcurrentParallelGatewaySenderQueue.java | 9 +++ .../wan/parallel/ParallelGatewaySenderQueue.java | 20 +++++- .../bean/stats/AsyncEventQueueStatsJUnitTest.java | 18 +++++ .../geode/internal/cache/wan/WANTestBase.java | 78 +++++++++++++++++++--- .../ParallelGatewaySenderOperationsDUnitTest.java | 38 +++++++++-- .../SerialGatewaySenderOperationsDUnitTest.java | 49 +++++++++++++- 12 files changed, 281 insertions(+), 27 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java index dee2c92..2f3029a 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java @@ -26,7 +26,7 @@ public class AsyncEventQueueStats extends GatewaySenderStats { public static final String typeName = "AsyncEventQueueStatistics"; /** The <code>StatisticsType</code> of the statistics */ - private static final StatisticsType type; + public static final StatisticsType type; static { @@ -87,6 +87,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats { f.createIntGauge(CONFLATION_INDEXES_MAP_SIZE, "Current number of entries in the conflation indexes map.", "events"), f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added to queue.", "events"), + f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER, + "Number of events not added to primary queue due to sender yet running.", "events"), f.createIntCounter(EVENTS_FILTERED, "Number of events filtered through GatewayEventFilter.", "events"), f.createIntCounter(LOAD_BALANCES_COMPLETED, "Number of load balances completed", @@ -122,6 +124,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats { unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE); conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE); notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS); + notQueuedEventsAtYetRunningPrimarySenderId = + type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER); eventsFilteredId = type.nameToId(EVENTS_FILTERED); eventsConflatedFromBatchesId = type.nameToId(EVENTS_CONFLATED_FROM_BATCHES); loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java index c91d236..664d054 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java @@ -2158,6 +2158,9 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent, if (this.getInhibitDistribution()) { buf.append(";inhibitDistribution"); } + if (this.tailKey != -1) { + buf.append(";tailKey=" + tailKey); + } buf.append("]"); return buf.toString(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java index ef8eb99..d468ef4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java @@ -2450,10 +2450,10 @@ public class PartitionedRegionDataStore implements HasCachePerfStats { return sizeOfLocalPrimaries; } - public int getSizeOfLocalBuckets(boolean includeSecondary) { + public int getSizeOfLocalBuckets() { int sizeOfLocal = 0; - Set<BucketRegion> primaryBuckets = getAllLocalBucketRegions(); - for (BucketRegion br : primaryBuckets) { + Set<BucketRegion> allLocalBuckets = getAllLocalBucketRegions(); + for (BucketRegion br : allLocalBuckets) { sizeOfLocal += br.size(); } return sizeOfLocal; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index a134e1e..034d810 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -849,7 +849,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi // If this gateway is not running, return if (!isRunning()) { if (isDebugEnabled) { - logger.debug("Returning back without putting into the gateway sender queue"); + logger.debug("Returning back without putting into the gateway sender queue:" + event); + } + if (this.eventProcessor != null) { + this.eventProcessor.registerEventDroppedInPrimaryQueue(event); } return; } @@ -962,7 +965,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi // The sender may have stopped, after we have checked the status in the beginning. if (!isRunning()) { if (isDebugEnabled) { - logger.debug("Returning back without putting into the gateway sender queue"); + logger.debug("Returning back without putting into the gateway sender queue:" + event); + } + if (this.eventProcessor != null) { + this.eventProcessor.registerEventDroppedInPrimaryQueue(event); } return; } @@ -1251,6 +1257,11 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi return localProcessor == null ? 0 : localProcessor.eventQueueSize(); } + public int getEventSecondaryQueueSize() { + AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor; + return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize(); + } + public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) { this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 9309e43..eea7480 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -33,6 +33,7 @@ import org.apache.geode.GemFireException; import org.apache.geode.SystemFailure; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.EntryOperation; import org.apache.geode.cache.Operation; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionDestroyedException; @@ -49,6 +50,7 @@ import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionHelper; import org.apache.geode.internal.cache.RegionQueue; import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; @@ -261,15 +263,57 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { } // This should be local size instead of pr size - if (this.queue instanceof ParallelGatewaySenderQueue) { - return ((ParallelGatewaySenderQueue) queue).localSize(); - } if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) { return ((ConcurrentParallelGatewaySenderQueue) queue).localSize(); } return this.queue.size(); } + public int eventSecondaryQueueSize() { + if (queue == null) { + return 0; + } + + // if parallel, get both primary and secondary queues' size, then substract primary queue's size + if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) { + int size = ((ConcurrentParallelGatewaySenderQueue) queue).localSize(true) + - ((ConcurrentParallelGatewaySenderQueue) queue).localSize(false); + return size; + } + return this.queue.size(); + } + + public void registerEventDroppedInPrimaryQueue(EntryEventImpl event) { + if (queue == null) { + return; + } + if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) { + ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue) queue; + PartitionedRegion prQ = cpgsq.getRegion(event.getRegion().getFullPath()); + if (prQ == null) { + if (logger.isDebugEnabled()) { + logger.debug("shadow partitioned region " + event.getRegion().getFullPath() + + " is not created yet."); + } + return; + } + int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) event); + long shadowKey = event.getTailKey(); + + ParallelGatewaySenderQueue pgsq = + (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId); + boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary(); + if (isPrimary) { + pgsq.addRemovedEvent(prQ, bucketId, shadowKey); + this.sender.getStatistics().incEventsNotQueuedAtYetRunningPrimarySender(); + if (logger.isDebugEnabled()) { + logger.debug("register dropped event for primary queue. BucketId is " + bucketId + + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath()); + } + } + } + } + /** * @return the sender */ diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java index c7fd370..2b93082 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java @@ -84,6 +84,8 @@ public class GatewaySenderStats { protected static final String EVENTS_FILTERED = "eventsFiltered"; protected static final String NOT_QUEUED_EVENTS = "notQueuedEvent"; + protected static final String NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER = + "notQueuedEventAtYetRunningPrimarySender"; protected static final String LOAD_BALANCES_COMPLETED = "loadBalancesCompleted"; protected static final String LOAD_BALANCES_IN_PROGRESS = "loadBalancesInProgress"; @@ -135,6 +137,8 @@ public class GatewaySenderStats { protected static int eventsFilteredId; /** Id of not queued events */ protected static int notQueuedEventsId; + /** Id of not queued events due to the primary sender is yet running */ + protected static int notQueuedEventsAtYetRunningPrimarySenderId; /** Id of events conflated in batch */ protected static int eventsConflatedFromBatchesId; /** Id of load balances completed */ @@ -213,6 +217,8 @@ public class GatewaySenderStats { f.createIntGauge(CONFLATION_INDEXES_MAP_SIZE, "Current number of entries in the conflation indexes map.", "events"), f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added to queue.", "events"), + f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER, + "Number of events not added to primary queue due to sender yet running.", "events"), f.createIntCounter(EVENTS_FILTERED, "Number of events filtered through GatewayEventFilter.", "events"), f.createIntCounter(LOAD_BALANCES_COMPLETED, "Number of load balances completed", @@ -249,6 +255,8 @@ public class GatewaySenderStats { unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE); conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE); notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS); + notQueuedEventsAtYetRunningPrimarySenderId = + type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER); eventsFilteredId = type.nameToId(EVENTS_FILTERED); eventsConflatedFromBatchesId = type.nameToId(EVENTS_CONFLATED_FROM_BATCHES); loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED); @@ -599,6 +607,14 @@ public class GatewaySenderStats { return this.stats.getInt(notQueuedEventsId); } + public void incEventsNotQueuedAtYetRunningPrimarySender() { + this.stats.incInt(notQueuedEventsAtYetRunningPrimarySenderId, 1); + } + + public int getEventsNotQueuedAtYetRunningPrimarySender() { + return this.stats.getInt(notQueuedEventsAtYetRunningPrimarySenderId); + } + public void incEventsFiltered() { this.stats.incInt(eventsFilteredId, 1); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java index 4fc940c..e556910 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java @@ -121,6 +121,11 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue { return this.processors[0].getQueue().size(); } + public String displayContent() { + ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) (processors[0].getQueue()); + return pgsq.displayContent(); + } + public int localSize() { return localSize(false); } @@ -190,6 +195,10 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue { return processors[index]; } + public RegionQueue getQueueByBucket(int bucketId) { + return getPGSProcessor(bucketId).getQueue(); + } + public BlockingQueue<GatewaySenderEventImpl> getBucketTmpQueue(int bucketId) { return getPGSProcessor(bucketId).getBucketTmpQueue(bucketId); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 3aa8534..89880fc 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -1112,7 +1112,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { // This method may need synchronization in case it is used by // ConcurrentParallelGatewaySender - protected void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) { + public void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) { StoppableReentrantLock lock = buckToDispatchLock; if (lock != null) { lock.lock(); @@ -1405,12 +1405,28 @@ public class ParallelGatewaySenderQueue implements RegionQueue { return localSize(false); } + public String displayContent() { + int size = 0; + StringBuffer sb = new StringBuffer(); + for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) { + if (prQ != null && prQ.getDataStore() != null) { + Set<BucketRegion> allLocalBuckets = prQ.getDataStore().getAllLocalBucketRegions(); + for (BucketRegion br : allLocalBuckets) { + if (br.size() > 0) { + sb.append("bucketId=" + br.getId() + ":" + br.keySet() + ";"); + } + } + } + } + return sb.toString(); + } + public int localSize(boolean includeSecondary) { int size = 0; for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) { if (prQ != null && prQ.getDataStore() != null) { if (includeSecondary) { - size += prQ.getDataStore().getSizeOfLocalBuckets(true); + size += prQ.getDataStore().getSizeOfLocalBuckets(); } else { size += prQ.getDataStore().getSizeOfLocalPrimaryBuckets(); } diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java index ea246bc..c4d0b7c 100644 --- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.geode.StatisticDescriptor; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueStats; import org.apache.geode.management.internal.beans.AsyncEventQueueMBeanBridge; import org.apache.geode.test.junit.categories.IntegrationTest; @@ -60,4 +61,21 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase { return bridge.getEventQueueSize(); } + @Test + public void testStatDescriptors() { + StatisticDescriptor[] sds = asyncEventQueueStats.type.getStatistics(); + int notQueueEvents = 0; + int notQueueToPrimary = 0; + for (StatisticDescriptor s : sds) { + if (s.getName().equals("notQueuedEvent")) { + notQueueEvents++; + } + if (s.getName().equals("notQueuedEventAtYetRunningPrimarySender")) { + notQueueToPrimary++; + } + } + assertEquals(1, notQueueEvents); + assertEquals(1, notQueueToPrimary); + } + } diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java index 226595b..3799083 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -933,6 +933,8 @@ public class WANTestBase extends DistributedTestCase { } props.setProperty(MCAST_PORT, "0"); props.setProperty(LOCATORS, "localhost[" + locPort + "]"); + String logLevel = System.getProperty(LOG_LEVEL, "info"); + props.setProperty(LOG_LEVEL, logLevel); InternalDistributedSystem ds = test.getSystem(props); cache = CacheFactory.create(ds); } @@ -1155,6 +1157,21 @@ public class WANTestBase extends DistributedTestCase { return stats; } + public static List<Integer> getSenderStatsForDroppedEvents(String senderId) { + AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId); + GatewaySenderStats statistics = sender.getStatistics(); + ArrayList<Integer> stats = new ArrayList<Integer>(); + int eventNotQueued = statistics.getEventsNotQueuedAtYetRunningPrimarySender(); + if (eventNotQueued > 0) { + logger.info( + "Found " + eventNotQueued + " not queued events due to primary sender is yet running"); + } + stats.add(eventNotQueued); + stats.add(statistics.getEventsNotQueued()); + stats.add(statistics.getEventsNotQueuedConflated()); + return stats; + } + public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived, final int eventsQueued, final int eventsDistributed) { GatewaySenderStats statistics = getGatewaySenderStats(senderId); @@ -2746,11 +2763,21 @@ public class WANTestBase extends DistributedTestCase { public static void validateQueueSizeStat(String id, final int queueSize) { final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id); - Awaitility.await().atMost(30, TimeUnit.SECONDS) + Awaitility.await().atMost(60, TimeUnit.SECONDS) .until(() -> assertEquals(queueSize, sender.getEventQueueSize())); assertEquals(queueSize, sender.getEventQueueSize()); } + public static void validateSecondaryQueueSizeStat(String id, final int queueSize) { + final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id); + Awaitility.await().atMost(120, TimeUnit.SECONDS) + .until(() -> assertEquals( + "Expected unprocessedEventMap is drained but actual is " + + sender.getStatistics().getUnprocessedEventMapSize(), + queueSize, sender.getStatistics().getUnprocessedEventMapSize())); + assertEquals(queueSize, sender.getStatistics().getUnprocessedEventMapSize()); + } + /** * This method is specifically written for pause and stop operations. This method validates that * the region size remains same for at least minimum number of verification attempts and also it @@ -3053,6 +3080,31 @@ public class WANTestBase extends DistributedTestCase { }); } + public static Integer getSecondaryQueueContentSize(final String senderId) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender; + int size = abstractSender.getEventSecondaryQueueSize(); + return size; + } + + public static String displayQueueContent(final RegionQueue queue) { + if (queue instanceof ParallelGatewaySenderQueue) { + ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) queue; + return pgsq.displayContent(); + } else if (queue instanceof ConcurrentParallelGatewaySenderQueue) { + ConcurrentParallelGatewaySenderQueue pgsq = (ConcurrentParallelGatewaySenderQueue) queue; + return pgsq.displayContent(); + } + return null; + } + public static Integer getQueueContentSize(final String senderId) { return getQueueContentSize(senderId, false); } @@ -3135,14 +3187,22 @@ public class WANTestBase extends DistributedTestCase { ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0]; Set<BucketRegion> buckets = ((PartitionedRegion) regionQueue.getRegion()).getDataStore() .getAllLocalPrimaryBucketRegions(); - for (final BucketRegion bucket : buckets) { - Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> { - assertEquals("Expected bucket entries for bucket: " + bucket.getId() - + " is: 0 but actual entries: " + bucket.keySet().size() + " This bucket isPrimary: " - + bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet(), 0, - bucket.keySet().size()); - }); - } // for loop ends + final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender; + RegionQueue queue = abstractSender.getEventProcessor().queue; + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> { + assertEquals("Expected events in all primary queues are drained but actual is " + + abstractSender.getEventQueueSize() + ". Queue content is: " + + displayQueueContent(queue), 0, abstractSender.getEventQueueSize()); + }); + assertEquals("Expected events in all primary queues after drain is 0", 0, + abstractSender.getEventQueueSize()); + Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> { + assertEquals("Expected events in all secondary queues are drained but actual is " + + abstractSender.getEventSecondaryQueueSize() + ". Queue content is: " + + displayQueueContent(queue), 0, abstractSender.getEventSecondaryQueueSize()); + }); + assertEquals("Except events in all secondary queues after drain is 0", 0, + abstractSender.getEventSecondaryQueueSize()); } finally { exp.remove(); exp1.remove(); diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java index eaef4f9..f5b98b7 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java @@ -20,6 +20,8 @@ import static org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_S import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException; import static org.assertj.core.api.Assertions.assertThat; +import java.util.ArrayList; + import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -407,18 +409,42 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { vm2.invoke(() -> validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200)); // SECOND RUN: start async puts on region - AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", 5000)); - - // when puts are happening by another thread, start the senders - startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); - - async.join(); + ArrayList<Integer> vm4List = null; + ArrayList<Integer> vm5List = null; + ArrayList<Integer> vm6List = null; + ArrayList<Integer> vm7List = null; + boolean foundDroppedAtYetStartedPrimarySender = false; + int count = 0; + + do { + stopSenders(); + AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", 5000)); + + // when puts are happening by another thread, start the senders + startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); + + async.join(); + vm4List = + (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); + vm5List = + (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); + vm6List = + (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); + vm7List = + (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); + if (vm4List.get(0) + vm5List.get(0) + vm6List.get(0) + vm7List.get(0) > 0) { + foundDroppedAtYetStartedPrimarySender = true; + } + count++; + } while (foundDroppedAtYetStartedPrimarySender == false && count < 5); + assertThat(foundDroppedAtYetStartedPrimarySender); // verify all the buckets on all the sender nodes are drained validateParallelSenderQueueAllBucketsDrained(); // verify that the queue size ultimately becomes zero. That means all the events propagate to // remote site. + vm4.invoke(() -> validateQueueContents("ln", 0)); } diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java index ee43b83..8df5650 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java @@ -269,6 +269,53 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase { } @Test + public void testRestartSerialGatewaySendersWhilePutting() throws Throwable { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createSenderCaches(lnPort); + + createSenderVM4(); + createSenderVM5(); + + createReceiverRegions(); + + createSenderRegions(); + + vm7.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 20)); + + startSenderInVMs("ln", vm4, vm5); + + vm7.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 20)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 20)); + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 20)); + + vm4.invoke(() -> WANTestBase.stopSender("ln")); + vm5.invoke(() -> WANTestBase.stopSender("ln")); + + vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState("ln")); + vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState("ln")); + + vm4.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0)); + vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0)); + + // do a lot of puts while senders are restarting + AsyncInvocation async = vm7.invokeAsync(() -> doPuts(getTestMethodName() + "_RR", 5000)); + + startSenderInVMsAsync("ln", vm4, vm5); + async.join(); + + vm4.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0)); + vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0)); + vm4.invoke(() -> WANTestBase.validateSecondaryQueueSizeStat("ln", 0)); + vm5.invoke(() -> WANTestBase.validateSecondaryQueueSizeStat("ln", 0)); + } + + @Test public void testStopOneSerialGatewaySenderBothPrimary() throws Throwable { Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -298,7 +345,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 200)); vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 200)); - // Do some puts while restarting a sender + // Do some puts from both vm4 and vm5 while restarting a sender AsyncInvocation asyncPuts = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 300)); -- To stop receiving notification emails like this one, please contact zho...@apache.org.