This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-5249
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 40f51112fb8759c4063f650b3bb7b7a90cac59dc
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Wed May 23 17:50:28 2018 -0700

    GEODE-5249: add test code to display serial gateway sender queue's content
---
 .../wan/AbstractGatewaySenderEventProcessor.java   |  9 +++++
 ...oncurrentSerialGatewaySenderEventProcessor.java | 16 +++++++++
 .../serial/SerialGatewaySenderEventProcessor.java  | 17 +++++++++
 .../cache/wan/serial/SerialGatewaySenderQueue.java |  4 +++
 ...SerialGatewaySenderEventProcessorJUnitTest.java | 33 ++++++++++++++++++
 .../geode/internal/cache/wan/WANTestBase.java      | 40 ++++++++++++++++++++++
 .../SerialGatewaySenderOperationsDUnitTest.java    | 34 ++++++++++++++++++
 7 files changed, 153 insertions(+)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 89fa586..0004789 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -1380,4 +1380,13 @@ public abstract class 
AbstractGatewaySenderEventProcessor extends Thread {
       return true;
     }
   }
+
+  public String printUnprocessedEvents() {
+    return null;
+  }
+
+  public String printUnprocessedTokens() {
+    return null;
+  }
+
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index 8ec6ce1..195e1a7 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -396,4 +396,20 @@ public class ConcurrentSerialGatewaySenderEventProcessor
       serialProcessor.enqueueEvent(event);
     }
   }
+
+  public String printUnprocessedEvents() {
+    StringBuffer sb = new StringBuffer();
+    for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
+      sb.append(serialProcessor.printUnprocessedEvents() + "\n");
+    }
+    return sb.toString();
+  }
+
+  public String printUnprocessedTokens() {
+    StringBuffer sb = new StringBuffer();
+    for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
+      sb.append(serialProcessor.printUnprocessedTokens() + "\n");
+    }
+    return sb.toString();
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 39609c7..121469f 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -20,12 +20,14 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.logging.log4j.Logger;
 
@@ -902,4 +904,19 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
     this.getSender().setModifiedEventId(droppedEvent);
     sendBatchDestroyOperationForDroppedEvent(droppedEvent, -1);
   }
+
+  private String printEventIdList(Set<EventID> eventIds) {
+    StringBuffer sb = new StringBuffer().append("[").append(
+        eventIds.stream().map(entry -> 
entry.expensiveToString()).collect(Collectors.joining(", ")))
+        .append("]");
+    return sb.toString();
+  }
+
+  public String printUnprocessedEvents() {
+    return printEventIdList(this.unprocessedEvents.keySet());
+  }
+
+  public String printUnprocessedTokens() {
+    return printEventIdList(this.unprocessedTokens.keySet());
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 64e3a6a..6cfe7f4 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -1294,4 +1294,8 @@ public class SerialGatewaySenderQueue implements 
RegionQueue {
       }
     }
   }
+
+  public String displayContent() {
+    return this.region.keySet().toString();
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java
index f21634e..6cb0fdb 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java
@@ -14,23 +14,30 @@
  */
 package org.apache.geode.internal.cache.wan.serial;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.Map;
+
+import org.apache.logging.log4j.Logger;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.springframework.test.util.ReflectionTestUtils;
 
 import org.apache.geode.cache.Operation;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderStats;
+import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -40,6 +47,8 @@ public class SerialGatewaySenderEventProcessorJUnitTest {
 
   private TestSerialGatewaySenderEventProcessor processor;
 
+  private static final Logger logger = LogService.getLogger();
+
   @Before
   public void setUp() throws Exception {
     this.sender = mock(AbstractGatewaySender.class);
@@ -104,6 +113,30 @@ public class SerialGatewaySenderEventProcessorJUnitTest {
     }
   }
 
+  @Test
+  public void validateUnProcessedEventsList() {
+    Map<EventID, AbstractGatewaySender.EventWrapper> unprocessedEvents =
+        (Map<EventID, AbstractGatewaySender.EventWrapper>) 
ReflectionTestUtils.getField(processor,
+            "unprocessedEvents");
+
+    long complexThreadId1 = 
ThreadIdentifier.createFakeThreadIDForParallelGSPrimaryBucket(0, 1, 1);
+    long complexThreadId3 = 
ThreadIdentifier.createFakeThreadIDForParallelGSPrimaryBucket(0, 3, 3);
+    unprocessedEvents.put(new EventID("mem1".getBytes(), complexThreadId1, 
1L), null);
+    unprocessedEvents.put(new EventID("mem2".getBytes(), 2L, 2L), null);
+
+    String unProcessedEvents = this.processor.printUnprocessedEvents();
+    logger.info("UnprocessedEvents: " + unProcessedEvents);
+    
assertThat(unProcessedEvents).contains("threadID=0x1010000|1;sequenceID=1");
+    assertThat(unProcessedEvents).contains("threadID=2;sequenceID=2");
+
+    processor.unprocessedTokens.put(new EventID("mem3".getBytes(), 
complexThreadId3, 3L), 3L);
+    processor.unprocessedTokens.put(new EventID("mem4".getBytes(), 4L, 4L), 
4L);
+    String unProcessedTokens = this.processor.printUnprocessedTokens();
+    logger.info("UnprocessedTokens: " + unProcessedTokens);
+    
assertThat(unProcessedTokens).contains("threadID=0x3010000|3;sequenceID=3");
+    assertThat(unProcessedTokens).contains("threadID=4;sequenceID=4");
+  }
+
   private EventID handlePrimaryEvent() {
     GatewaySenderEventImpl gsei = mock(GatewaySenderEventImpl.class);
     EventID id = mock(EventID.class);
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index a7a281a..6b11349 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -37,6 +37,7 @@ import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATO
 import static 
org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
 import static org.apache.geode.test.dunit.Host.getHost;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -71,6 +72,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import javax.management.ObjectName;
 
@@ -1249,6 +1251,15 @@ public class WANTestBase extends DistributedTestCase {
     assert (statistics.getEventsDistributed() >= eventsDistributed);
   }
 
+  public static void validateGatewaySenderQueueHasContent(String senderId, 
VM... targetVms) {
+    Awaitility.await().atMost(60, TimeUnit.SECONDS)
+        .until(() -> Arrays.stream(targetVms).map(vm -> vm.invoke(() -> {
+          AbstractGatewaySender sender = (AbstractGatewaySender) 
cache.getGatewaySender(senderId);
+          logger.info(displaySerialQueueContent(sender));
+          return sender.getEventQueueSize();
+        })).mapToInt(i -> i).sum(), greaterThan(0));
+  }
+
   public static void checkGatewayReceiverStats(int processBatches, int 
eventsReceived,
       int creates) {
     Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
@@ -3171,6 +3182,35 @@ public class WANTestBase extends DistributedTestCase {
     return null;
   }
 
+  public static String displaySerialQueueContent(final AbstractGatewaySender 
sender) {
+    StringBuilder message = new StringBuilder();
+    message.append("Is Primary: ").append(sender.isPrimary()).append(", 
").append("Queue Size: ")
+        .append(sender.getEventQueueSize());
+
+    if (sender.getQueues() != null) {
+      message.append(", ").append("Queue Count: 
").append(sender.getQueues().size());
+      Stream<Object> stream = sender.getQueues().stream()
+          .map(regionQueue -> ((SerialGatewaySenderQueue) 
regionQueue).displayContent());
+
+      List<Object> list = stream.collect(Collectors.toList());
+      message.append(", ").append("Keys: ").append(list.toString());
+    }
+
+    AbstractGatewaySenderEventProcessor abstractProcessor = 
sender.getEventProcessor();
+    if (abstractProcessor == null) {
+      message.append(", ").append("Null Event Processor: ");
+    }
+    if (sender.isPrimary()) {
+      message.append(", ").append("Unprocessed Tokens: ")
+          .append(abstractProcessor.printUnprocessedTokens());
+    } else {
+      message.append(", ").append("Unprocessed Events: ")
+          .append(abstractProcessor.printUnprocessedEvents());
+    }
+
+    return message.toString();
+  }
+
   public static Integer getQueueContentSize(final String senderId) {
     return getQueueContentSize(senderId, false);
   }
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
index 4993f24..a729541 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -313,6 +313,40 @@ public class SerialGatewaySenderOperationsDUnitTest 
extends WANTestBase {
   }
 
   @Test
+  public void testSerialGatewaySendersPrintQueueContents() throws Throwable {
+    Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createSenderCaches(lnPort);
+
+    createSenderVM4();
+    createSenderVM5();
+
+    createReceiverRegions();
+
+    createSenderRegions();
+
+    startSenderInVMs("ln", vm4, vm5);
+    vm4.invoke(() -> pauseSender("ln"));
+    vm5.invoke(() -> pauseSender("ln"));
+
+    vm7.invoke(() -> doPuts(getTestMethodName() + "_RR", 20));
+
+    validateGatewaySenderQueueHasContent("ln", vm4, vm5);
+
+    vm4.invokeAsync(() -> resumeSender("ln"));
+    vm5.invokeAsync(() -> resumeSender("ln"));
+
+    vm4.invoke(() -> validateQueueSizeStat("ln", 0));
+    vm5.invoke(() -> validateQueueSizeStat("ln", 0));
+    vm4.invoke(() -> validateSecondaryQueueSizeStat("ln", 0));
+    vm5.invoke(() -> validateSecondaryQueueSizeStat("ln", 0));
+  }
+
+  @Test
   public void testStopOneSerialGatewaySenderBothPrimary() throws Throwable {
     Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));

-- 
To stop receiving notification emails like this one, please contact
zho...@apache.org.

Reply via email to