This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-5249 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 40f51112fb8759c4063f650b3bb7b7a90cac59dc Author: zhouxh <gz...@pivotal.io> AuthorDate: Wed May 23 17:50:28 2018 -0700 GEODE-5249: add test code to display serial gateway sender queue's content --- .../wan/AbstractGatewaySenderEventProcessor.java | 9 +++++ ...oncurrentSerialGatewaySenderEventProcessor.java | 16 +++++++++ .../serial/SerialGatewaySenderEventProcessor.java | 17 +++++++++ .../cache/wan/serial/SerialGatewaySenderQueue.java | 4 +++ ...SerialGatewaySenderEventProcessorJUnitTest.java | 33 ++++++++++++++++++ .../geode/internal/cache/wan/WANTestBase.java | 40 ++++++++++++++++++++++ .../SerialGatewaySenderOperationsDUnitTest.java | 34 ++++++++++++++++++ 7 files changed, 153 insertions(+) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 89fa586..0004789 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -1380,4 +1380,13 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { return true; } } + + public String printUnprocessedEvents() { + return null; + } + + public String printUnprocessedTokens() { + return null; + } + } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java index 8ec6ce1..195e1a7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java @@ -396,4 +396,20 @@ public class ConcurrentSerialGatewaySenderEventProcessor serialProcessor.enqueueEvent(event); } } + + public String printUnprocessedEvents() { + StringBuffer sb = new StringBuffer(); + for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) { + sb.append(serialProcessor.printUnprocessedEvents() + "\n"); + } + return sb.toString(); + } + + public String printUnprocessedTokens() { + StringBuffer sb = new StringBuffer(); + for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) { + sb.append(serialProcessor.printUnprocessedTokens() + "\n"); + } + return sb.toString(); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java index 39609c7..121469f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java @@ -20,12 +20,14 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.logging.log4j.Logger; @@ -902,4 +904,19 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven this.getSender().setModifiedEventId(droppedEvent); sendBatchDestroyOperationForDroppedEvent(droppedEvent, -1); } + + private String printEventIdList(Set<EventID> eventIds) { + StringBuffer sb = new StringBuffer().append("[").append( + eventIds.stream().map(entry -> entry.expensiveToString()).collect(Collectors.joining(", "))) + .append("]"); + return sb.toString(); + } + + public String printUnprocessedEvents() { + return printEventIdList(this.unprocessedEvents.keySet()); + } + + public String printUnprocessedTokens() { + return printEventIdList(this.unprocessedTokens.keySet()); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java index 64e3a6a..6cfe7f4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java @@ -1294,4 +1294,8 @@ public class SerialGatewaySenderQueue implements RegionQueue { } } } + + public String displayContent() { + return this.region.keySet().toString(); + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java index f21634e..6cb0fdb 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java @@ -14,23 +14,30 @@ */ package org.apache.geode.internal.cache.wan.serial; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Map; + +import org.apache.logging.log4j.Logger; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.springframework.test.util.ReflectionTestUtils; import org.apache.geode.cache.Operation; import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.ha.ThreadIdentifier; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.cache.wan.GatewaySenderStats; +import org.apache.geode.internal.logging.LogService; import org.apache.geode.test.junit.categories.UnitTest; @Category(UnitTest.class) @@ -40,6 +47,8 @@ public class SerialGatewaySenderEventProcessorJUnitTest { private TestSerialGatewaySenderEventProcessor processor; + private static final Logger logger = LogService.getLogger(); + @Before public void setUp() throws Exception { this.sender = mock(AbstractGatewaySender.class); @@ -104,6 +113,30 @@ public class SerialGatewaySenderEventProcessorJUnitTest { } } + @Test + public void validateUnProcessedEventsList() { + Map<EventID, AbstractGatewaySender.EventWrapper> unprocessedEvents = + (Map<EventID, AbstractGatewaySender.EventWrapper>) ReflectionTestUtils.getField(processor, + "unprocessedEvents"); + + long complexThreadId1 = ThreadIdentifier.createFakeThreadIDForParallelGSPrimaryBucket(0, 1, 1); + long complexThreadId3 = ThreadIdentifier.createFakeThreadIDForParallelGSPrimaryBucket(0, 3, 3); + unprocessedEvents.put(new EventID("mem1".getBytes(), complexThreadId1, 1L), null); + unprocessedEvents.put(new EventID("mem2".getBytes(), 2L, 2L), null); + + String unProcessedEvents = this.processor.printUnprocessedEvents(); + logger.info("UnprocessedEvents: " + unProcessedEvents); + assertThat(unProcessedEvents).contains("threadID=0x1010000|1;sequenceID=1"); + assertThat(unProcessedEvents).contains("threadID=2;sequenceID=2"); + + processor.unprocessedTokens.put(new EventID("mem3".getBytes(), complexThreadId3, 3L), 3L); + processor.unprocessedTokens.put(new EventID("mem4".getBytes(), 4L, 4L), 4L); + String unProcessedTokens = this.processor.printUnprocessedTokens(); + logger.info("UnprocessedTokens: " + unProcessedTokens); + assertThat(unProcessedTokens).contains("threadID=0x3010000|3;sequenceID=3"); + assertThat(unProcessedTokens).contains("threadID=4;sequenceID=4"); + } + private EventID handlePrimaryEvent() { GatewaySenderEventImpl gsei = mock(GatewaySenderEventImpl.class); EventID id = mock(EventID.class); diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java index a7a281a..6b11349 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -37,6 +37,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATO import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR; import static org.apache.geode.test.dunit.Host.getHost; import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -71,6 +72,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.management.ObjectName; @@ -1249,6 +1251,15 @@ public class WANTestBase extends DistributedTestCase { assert (statistics.getEventsDistributed() >= eventsDistributed); } + public static void validateGatewaySenderQueueHasContent(String senderId, VM... targetVms) { + Awaitility.await().atMost(60, TimeUnit.SECONDS) + .until(() -> Arrays.stream(targetVms).map(vm -> vm.invoke(() -> { + AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId); + logger.info(displaySerialQueueContent(sender)); + return sender.getEventQueueSize(); + })).mapToInt(i -> i).sum(), greaterThan(0)); + } + public static void checkGatewayReceiverStats(int processBatches, int eventsReceived, int creates) { Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers(); @@ -3171,6 +3182,35 @@ public class WANTestBase extends DistributedTestCase { return null; } + public static String displaySerialQueueContent(final AbstractGatewaySender sender) { + StringBuilder message = new StringBuilder(); + message.append("Is Primary: ").append(sender.isPrimary()).append(", ").append("Queue Size: ") + .append(sender.getEventQueueSize()); + + if (sender.getQueues() != null) { + message.append(", ").append("Queue Count: ").append(sender.getQueues().size()); + Stream<Object> stream = sender.getQueues().stream() + .map(regionQueue -> ((SerialGatewaySenderQueue) regionQueue).displayContent()); + + List<Object> list = stream.collect(Collectors.toList()); + message.append(", ").append("Keys: ").append(list.toString()); + } + + AbstractGatewaySenderEventProcessor abstractProcessor = sender.getEventProcessor(); + if (abstractProcessor == null) { + message.append(", ").append("Null Event Processor: "); + } + if (sender.isPrimary()) { + message.append(", ").append("Unprocessed Tokens: ") + .append(abstractProcessor.printUnprocessedTokens()); + } else { + message.append(", ").append("Unprocessed Events: ") + .append(abstractProcessor.printUnprocessedEvents()); + } + + return message.toString(); + } + public static Integer getQueueContentSize(final String senderId) { return getQueueContentSize(senderId, false); } diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java index 4993f24..a729541 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java @@ -313,6 +313,40 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase { } @Test + public void testSerialGatewaySendersPrintQueueContents() throws Throwable { + Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createSenderCaches(lnPort); + + createSenderVM4(); + createSenderVM5(); + + createReceiverRegions(); + + createSenderRegions(); + + startSenderInVMs("ln", vm4, vm5); + vm4.invoke(() -> pauseSender("ln")); + vm5.invoke(() -> pauseSender("ln")); + + vm7.invoke(() -> doPuts(getTestMethodName() + "_RR", 20)); + + validateGatewaySenderQueueHasContent("ln", vm4, vm5); + + vm4.invokeAsync(() -> resumeSender("ln")); + vm5.invokeAsync(() -> resumeSender("ln")); + + vm4.invoke(() -> validateQueueSizeStat("ln", 0)); + vm5.invoke(() -> validateQueueSizeStat("ln", 0)); + vm4.invoke(() -> validateSecondaryQueueSizeStat("ln", 0)); + vm5.invoke(() -> validateSecondaryQueueSizeStat("ln", 0)); + } + + @Test public void testStopOneSerialGatewaySenderBothPrimary() throws Throwable { Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); -- To stop receiving notification emails like this one, please contact zho...@apache.org.