This is an automated email from the ASF dual-hosted git repository.

onichols pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.13 by this push:
     new c5f1f34  GEODE-8930: Using WaitingThreadPool to process the message.
c5f1f34 is described below

commit c5f1f34468a8f72f7e7113ef40cee2c26d6c015a
Author: Eric Shu <[email protected]>
AuthorDate: Tue Feb 9 12:34:25 2021 -0800

    GEODE-8930: Using WaitingThreadPool to process the message.
    
    (cherry picked from develop)
    
    (cherry picked from commit d015751c6f48176989783cfa85a1b4ff4ab3a82e)
---
 .../internal/cache/tx/RemoteOperationMessage.java  | 18 +++++++-
 .../cache/tx/RemoteOperationMessageTest.java       | 48 ++++++++++++----------
 2 files changed, 42 insertions(+), 24 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteOperationMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteOperationMessage.java
index 9c75fc6..ba49d73 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteOperationMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteOperationMessage.java
@@ -169,14 +169,24 @@ public abstract class RemoteOperationMessage extends 
DistributionMessage
    */
   @Override
   public void process(final ClusterDistributionManager dm) {
+    InternalCache cache = getCache(dm);
+    if (cache == null) {
+      String message = getCacheClosedMessage(dm);
+      ReplyException replyException = new ReplyException(new 
CacheClosedException(message));
+      sendReply(getSender(), this.processorId, dm, replyException, null, 0);
+      return;
+    }
+    dm.getExecutors().getWaitingThreadPool().execute(() -> 
doRemoteOperation(dm, cache));
+  }
+
+  void doRemoteOperation(ClusterDistributionManager dm, InternalCache cache) {
     Throwable thr = null;
     boolean sendReply = true;
     LocalRegion r = null;
     long startTime = 0;
     try {
-      InternalCache cache = getCache(dm);
       if (checkCacheClosing(cache) || checkDSClosing(dm)) {
-        String message = "Remote cache is closed: " + dm.getId();
+        String message = getCacheClosedMessage(dm);
         if (cache == null) {
           thr = new CacheClosedException(message);
         } else {
@@ -273,6 +283,10 @@ public abstract class RemoteOperationMessage extends 
DistributionMessage
     }
   }
 
+  private String getCacheClosedMessage(ClusterDistributionManager dm) {
+    return "Remote cache is closed: " + dm.getId();
+  }
+
   protected void checkForSystemFailure() {
     SystemFailure.checkFailure();
   }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/RemoteOperationMessageTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/RemoteOperationMessageTest.java
index fa24bf9..e10c2f0 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/RemoteOperationMessageTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/RemoteOperationMessageTest.java
@@ -30,6 +30,8 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.concurrent.ExecutorService;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -40,6 +42,7 @@ import org.apache.geode.cache.TransactionException;
 import org.apache.geode.distributed.DistributedSystemDisconnectedException;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.OperationExecutors;
 import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -53,13 +56,10 @@ import org.apache.geode.test.fake.Fakes;
 
 
 public class RemoteOperationMessageTest {
-
   private TestableRemoteOperationMessage msg; // the class under test
 
-  private InternalDistributedMember recipient;
   private InternalDistributedMember sender;
   private final String regionPath = "regionPath";
-  private ReplyProcessor21 processor;
 
   private GemFireCacheImpl cache;
   private InternalDistributedSystem system;
@@ -77,13 +77,17 @@ public class RemoteOperationMessageTest {
     r = mock(LocalRegion.class);
     txMgr = mock(TXManagerImpl.class);
     tx = mock(TXStateProxyImpl.class);
+    OperationExecutors executors = mock(OperationExecutors.class);
+    ExecutorService executorService = mock(ExecutorService.class);
     when(cache.getRegionByPathForProcessing(regionPath)).thenReturn(r);
     when(cache.getTxManager()).thenReturn(txMgr);
+    when(dm.getExecutors()).thenReturn(executors);
+    when(executors.getWaitingThreadPool()).thenReturn(executorService);
 
     sender = mock(InternalDistributedMember.class);
 
-    recipient = mock(InternalDistributedMember.class);
-    processor = mock(ReplyProcessor21.class);
+    InternalDistributedMember recipient = 
mock(InternalDistributedMember.class);
+    ReplyProcessor21 processor = mock(ReplyProcessor21.class);
     // make it a spy to aid verification
     msg = spy(new TestableRemoteOperationMessage(recipient, regionPath, 
processor));
   }
@@ -93,7 +97,7 @@ public class RemoteOperationMessageTest {
     when(txMgr.masqueradeAs(msg)).thenReturn(null);
     msg.setSender(sender);
 
-    msg.process(dm);
+    msg.doRemoteOperation(dm, cache);
 
     verify(msg, times(1)).operateOnRegion(dm, r, startTime);
     verify(dm, times(1)).putOutgoing(any());
@@ -105,7 +109,7 @@ public class RemoteOperationMessageTest {
     when(tx.isInProgress()).thenReturn(true);
     msg.setSender(sender);
 
-    msg.process(dm);
+    msg.doRemoteOperation(dm, cache);
 
     verify(msg, times(1)).operateOnRegion(dm, r, startTime);
     verify(dm, times(1)).putOutgoing(any());
@@ -117,7 +121,7 @@ public class RemoteOperationMessageTest {
     when(tx.isInProgress()).thenReturn(false);
     msg.setSender(sender);
 
-    msg.process(dm);
+    msg.doRemoteOperation(dm, cache);
 
     verify(msg, times(0)).operateOnRegion(dm, r, startTime);
     // A reply is sent even though we do not call operationOnRegion
@@ -130,7 +134,7 @@ public class RemoteOperationMessageTest {
     when(tx.isInProgress()).thenReturn(false);
     msg.setSender(sender);
 
-    msg.process(dm);
+    msg.doRemoteOperation(dm, cache);
 
     verify(msg, times(1)).sendReply(
         eq(sender),
@@ -147,7 +151,7 @@ public class RemoteOperationMessageTest {
     when(txMgr.isClosed()).thenReturn(true);
     msg.setSender(sender);
 
-    msg.process(dm);
+    msg.doRemoteOperation(dm, cache);
 
     verify(msg, times(0)).operateOnRegion(dm, r, startTime);
     // If we do not respond what prevents the sender from waiting forever?
@@ -176,7 +180,7 @@ public class RemoteOperationMessageTest {
     
when(cache.getCacheClosedException(any())).thenReturn(reasonCacheWasClosed);
     msg.setSender(sender);
 
-    msg.process(dm);
+    msg.doRemoteOperation(dm, cache);
     verify(msg, times(0)).operateOnRegion(dm, r, startTime);
     verify(dm, times(1)).putOutgoing(any());
     ArgumentCaptor<ReplyException> captor = 
ArgumentCaptor.forClass(ReplyException.class);
@@ -192,7 +196,7 @@ public class RemoteOperationMessageTest {
     doNothing().when(msg).checkForSystemFailure();
     msg.setSender(sender);
 
-    msg.process(dm);
+    msg.doRemoteOperation(dm, cache);
     verify(dm, times(1)).putOutgoing(any());
     ArgumentCaptor<ReplyException> captor = 
ArgumentCaptor.forClass(ReplyException.class);
     verify(msg, times(1)).sendReply(any(), anyInt(), eq(dm), captor.capture(), 
eq(r),
@@ -208,7 +212,7 @@ public class RemoteOperationMessageTest {
     when(system.isDisconnecting()).thenReturn(false).thenReturn(true);
     msg.setSender(sender);
 
-    msg.process(dm);
+    msg.doRemoteOperation(dm, cache);
     verify(dm, times(1)).putOutgoing(any());
     ArgumentCaptor<ReplyException> captor = 
ArgumentCaptor.forClass(ReplyException.class);
     verify(msg, times(1)).sendReply(any(), anyInt(), eq(dm), captor.capture(), 
eq(r),
@@ -223,7 +227,7 @@ public class RemoteOperationMessageTest {
     when(msg.operateOnRegion(dm, r, startTime)).thenThrow(ex);
     msg.setSender(sender);
 
-    msg.process(dm);
+    msg.doRemoteOperation(dm, cache);
     verify(dm, times(1)).putOutgoing(any());
     ArgumentCaptor<ReplyException> captor = 
ArgumentCaptor.forClass(ReplyException.class);
     verify(msg, times(1)).sendReply(any(), anyInt(), eq(dm), captor.capture(), 
eq(r),
@@ -237,7 +241,7 @@ public class RemoteOperationMessageTest {
     when(cache.getRegionByPathForProcessing(regionPath)).thenReturn(null);
     msg.setSender(sender);
 
-    msg.process(dm);
+    msg.doRemoteOperation(dm, cache);
     verify(msg, never()).operateOnRegion(any(), any(), anyLong());
     verify(dm, times(1)).putOutgoing(any());
     ArgumentCaptor<ReplyException> captor = 
ArgumentCaptor.forClass(ReplyException.class);
@@ -253,16 +257,16 @@ public class RemoteOperationMessageTest {
         .thenThrow(DistributedSystemDisconnectedException.class);
     msg.setSender(sender);
 
-    msg.process(dm);
+    msg.doRemoteOperation(dm, cache);
     verify(dm, never()).putOutgoing(any());
   }
 
   @Test
-  public void processWithOperateOnRegionReturningFalseDoesNotSendReply() 
throws Exception {
+  public void processWithOperateOnRegionReturningFalseDoesNotSendReply() {
     msg.setOperationOnRegionResult(false);
     msg.setSender(sender);
 
-    msg.process(dm);
+    msg.doRemoteOperation(dm, cache);
     verify(dm, never()).putOutgoing(any());
   }
 
@@ -273,7 +277,7 @@ public class RemoteOperationMessageTest {
     when(msg.operateOnRegion(dm, r, startTime)).thenThrow(theException);
     msg.setSender(sender);
 
-    msg.process(dm);
+    msg.doRemoteOperation(dm, cache);
     verify(dm, times(1)).putOutgoing(any());
     ArgumentCaptor<ReplyException> captor = 
ArgumentCaptor.forClass(ReplyException.class);
     verify(msg, times(1)).sendReply(any(), anyInt(), eq(dm), captor.capture(), 
eq(r),
@@ -288,7 +292,7 @@ public class RemoteOperationMessageTest {
     doThrow(new 
RuntimeException("SystemFailure")).when(msg).checkForSystemFailure();
     msg.setSender(sender);
 
-    assertThatThrownBy(() -> 
msg.process(dm)).isInstanceOf(RuntimeException.class)
+    assertThatThrownBy(() -> msg.doRemoteOperation(dm, 
cache)).isInstanceOf(RuntimeException.class)
         .hasMessage("SystemFailure");
     verify(dm, times(1)).putOutgoing(any());
     ArgumentCaptor<ReplyException> captor = 
ArgumentCaptor.forClass(ReplyException.class);
@@ -302,7 +306,7 @@ public class RemoteOperationMessageTest {
 
     private boolean operationOnRegionResult = true;
 
-    public TestableRemoteOperationMessage(InternalDistributedMember recipient, 
String regionPath,
+    TestableRemoteOperationMessage(InternalDistributedMember recipient, String 
regionPath,
         ReplyProcessor21 processor) {
       super(recipient, regionPath, processor);
     }
@@ -318,7 +322,7 @@ public class RemoteOperationMessageTest {
       return operationOnRegionResult;
     }
 
-    public void setOperationOnRegionResult(boolean v) {
+    void setOperationOnRegionResult(boolean v) {
       this.operationOnRegionResult = v;
     }
 

Reply via email to