GEODE-2490: Avoid processing tombstone GC message in-line

Currently the tombstone message sent for replicas are getting
processed in-line instead of handing it to thread pool.
Based on the number of nodes in the cluster, this may take
long time to process, impacting other cache operation that
required to be processed in-line.

The change provided here enables tombstone messages to
be not processed in-line instead processed in separate
thread.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/826bdbfe
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/826bdbfe
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/826bdbfe

Branch: refs/heads/feature/GEODE-2267
Commit: 826bdbfe2ae2f4b3cd27760584684bc35e19e9b7
Parents: 974d72c
Author: Anil <aging...@pivotal.io>
Authored: Wed Feb 15 17:35:22 2017 -0800
Committer: Anil <aging...@pivotal.io>
Committed: Fri Feb 17 17:44:46 2017 -0800

----------------------------------------------------------------------
 .../cache/DistributedTombstoneOperation.java    | 12 ++++
 .../geode/cache30/ClientServerCCEDUnitTest.java | 62 +++++++++++++++++++-
 2 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/826bdbfe/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedTombstoneOperation.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedTombstoneOperation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedTombstoneOperation.java
index 0765e16..1759c86 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedTombstoneOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedTombstoneOperation.java
@@ -115,6 +115,12 @@ public class DistributedTombstoneOperation extends 
DistributedCacheOperation {
     return this.regionGCVersions;
   }
 
+  @Override
+  public boolean supportsDirectAck() {
+    // Set to false to force TombstoneMessage to use shared connection w/o 
in-line processing
+    return false;
+  }
+
   public static class TombstoneMessage extends CacheOperationMessage
       implements SerializationVersions {
     // protected long regionVersion;
@@ -130,6 +136,12 @@ public class DistributedTombstoneOperation extends 
DistributedCacheOperation {
     public TombstoneMessage() {}
 
     @Override
+    public int getProcessorType() {
+      // Set to STANDARD to keep it from being processed in-line
+      return DistributionManager.STANDARD_EXECUTOR;
+    }
+
+    @Override
     protected InternalCacheEvent createEvent(DistributedRegion rgn) throws 
EntryNotFoundException {
       RegionEventImpl event = createRegionEvent(rgn);
       event.setEventID(this.eventID);

http://git-wip-us.apache.org/repos/asf/geode/blob/826bdbfe/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
index 75cd95b..e33074d 100644
--- 
a/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
@@ -37,7 +37,6 @@ import 
org.apache.geode.test.junit.categories.ClientServerTest;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.CacheListener;
 import org.apache.geode.cache.DataPolicy;
@@ -51,12 +50,18 @@ import org.apache.geode.cache.client.ClientRegionFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.cache.AbstractRegionEntry;
+import 
org.apache.geode.internal.cache.DistributedTombstoneOperation.TombstoneMessage;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.ha.HARegionQueue;
+import org.apache.geode.internal.cache.partitioned.PRTombstoneMessage;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
+import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.LogWriterUtils;
@@ -219,6 +224,61 @@ public class ClientServerCCEDUnitTest extends 
JUnit4CacheTestCase {
     checkClientDoesNotReceiveGC(vm3);
   }
 
+  @Test
+  public void testTombstoneMessageSentToReplicatesAreNotProcessedInLine() {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+
+    final String name = "Region";
+
+    createServerRegion(vm0, name, true);
+    createEntries(vm0);
+    createServerRegion(vm1, name, true);
+
+    try {
+      vm1.invoke(() -> {
+        DistributionMessageObserver.setInstance(new 
PRTombstoneMessageObserver());
+      });
+      destroyEntries(vm0);
+      forceGC(vm0);
+
+      vm1.invoke(() -> {
+        PRTombstoneMessageObserver mo =
+            (PRTombstoneMessageObserver) 
DistributionMessageObserver.getInstance();
+        Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
+          return mo.tsMessageProcessed >= 1;
+        });
+        assertTrue("Tombstone GC message is not expected.", mo.thName.contains(
+            
LocalizedStrings.DistributionManager_POOLED_MESSAGE_PROCESSOR.toLocalizedString()));
+      });
+
+    } finally {
+      vm1.invoke(() -> {
+        DistributionMessageObserver.setInstance(null);
+      });
+    }
+  }
+
+  private class PRTombstoneMessageObserver extends DistributionMessageObserver 
{
+    public int tsMessageProcessed = 0;
+    public int prTsMessageProcessed = 0;
+    public String thName;
+
+    @Override
+    public void afterProcessMessage(DistributionManager dm, 
DistributionMessage message) {
+      thName = Thread.currentThread().getName();
+
+      if (message instanceof TombstoneMessage) {
+        tsMessageProcessed++;
+      }
+
+      if (message instanceof PRTombstoneMessage) {
+        prTsMessageProcessed++;
+      }
+    }
+  }
+
   /**
    * for bug #40791 we pull tombstones into clients on get(), getAll() and 
registerInterest() to
    * protect the client cache from stray putAll events sitting in backup 
queues on the server

Reply via email to