This is an automated email from the ASF dual-hosted git repository.
onichols pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.13 by this push:
new c5f1f34 GEODE-8930: Using WaitingThreadPool to process the message.
c5f1f34 is described below
commit c5f1f34468a8f72f7e7113ef40cee2c26d6c015a
Author: Eric Shu <[email protected]>
AuthorDate: Tue Feb 9 12:34:25 2021 -0800
GEODE-8930: Using WaitingThreadPool to process the message.
(cherry picked from develop)
(cherry picked from commit d015751c6f48176989783cfa85a1b4ff4ab3a82e)
---
.../internal/cache/tx/RemoteOperationMessage.java | 18 +++++++-
.../cache/tx/RemoteOperationMessageTest.java | 48 ++++++++++++----------
2 files changed, 42 insertions(+), 24 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteOperationMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteOperationMessage.java
index 9c75fc6..ba49d73 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteOperationMessage.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteOperationMessage.java
@@ -169,14 +169,24 @@ public abstract class RemoteOperationMessage extends
DistributionMessage
*/
@Override
public void process(final ClusterDistributionManager dm) {
+ InternalCache cache = getCache(dm);
+ if (cache == null) {
+ String message = getCacheClosedMessage(dm);
+ ReplyException replyException = new ReplyException(new
CacheClosedException(message));
+ sendReply(getSender(), this.processorId, dm, replyException, null, 0);
+ return;
+ }
+ dm.getExecutors().getWaitingThreadPool().execute(() ->
doRemoteOperation(dm, cache));
+ }
+
+ void doRemoteOperation(ClusterDistributionManager dm, InternalCache cache) {
Throwable thr = null;
boolean sendReply = true;
LocalRegion r = null;
long startTime = 0;
try {
- InternalCache cache = getCache(dm);
if (checkCacheClosing(cache) || checkDSClosing(dm)) {
- String message = "Remote cache is closed: " + dm.getId();
+ String message = getCacheClosedMessage(dm);
if (cache == null) {
thr = new CacheClosedException(message);
} else {
@@ -273,6 +283,10 @@ public abstract class RemoteOperationMessage extends
DistributionMessage
}
}
+ private String getCacheClosedMessage(ClusterDistributionManager dm) {
+ return "Remote cache is closed: " + dm.getId();
+ }
+
protected void checkForSystemFailure() {
SystemFailure.checkFailure();
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/RemoteOperationMessageTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/RemoteOperationMessageTest.java
index fa24bf9..e10c2f0 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/RemoteOperationMessageTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/RemoteOperationMessageTest.java
@@ -30,6 +30,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.concurrent.ExecutorService;
+
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -40,6 +42,7 @@ import org.apache.geode.cache.TransactionException;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.OperationExecutors;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -53,13 +56,10 @@ import org.apache.geode.test.fake.Fakes;
public class RemoteOperationMessageTest {
-
private TestableRemoteOperationMessage msg; // the class under test
- private InternalDistributedMember recipient;
private InternalDistributedMember sender;
private final String regionPath = "regionPath";
- private ReplyProcessor21 processor;
private GemFireCacheImpl cache;
private InternalDistributedSystem system;
@@ -77,13 +77,17 @@ public class RemoteOperationMessageTest {
r = mock(LocalRegion.class);
txMgr = mock(TXManagerImpl.class);
tx = mock(TXStateProxyImpl.class);
+ OperationExecutors executors = mock(OperationExecutors.class);
+ ExecutorService executorService = mock(ExecutorService.class);
when(cache.getRegionByPathForProcessing(regionPath)).thenReturn(r);
when(cache.getTxManager()).thenReturn(txMgr);
+ when(dm.getExecutors()).thenReturn(executors);
+ when(executors.getWaitingThreadPool()).thenReturn(executorService);
sender = mock(InternalDistributedMember.class);
- recipient = mock(InternalDistributedMember.class);
- processor = mock(ReplyProcessor21.class);
+ InternalDistributedMember recipient =
mock(InternalDistributedMember.class);
+ ReplyProcessor21 processor = mock(ReplyProcessor21.class);
// make it a spy to aid verification
msg = spy(new TestableRemoteOperationMessage(recipient, regionPath,
processor));
}
@@ -93,7 +97,7 @@ public class RemoteOperationMessageTest {
when(txMgr.masqueradeAs(msg)).thenReturn(null);
msg.setSender(sender);
- msg.process(dm);
+ msg.doRemoteOperation(dm, cache);
verify(msg, times(1)).operateOnRegion(dm, r, startTime);
verify(dm, times(1)).putOutgoing(any());
@@ -105,7 +109,7 @@ public class RemoteOperationMessageTest {
when(tx.isInProgress()).thenReturn(true);
msg.setSender(sender);
- msg.process(dm);
+ msg.doRemoteOperation(dm, cache);
verify(msg, times(1)).operateOnRegion(dm, r, startTime);
verify(dm, times(1)).putOutgoing(any());
@@ -117,7 +121,7 @@ public class RemoteOperationMessageTest {
when(tx.isInProgress()).thenReturn(false);
msg.setSender(sender);
- msg.process(dm);
+ msg.doRemoteOperation(dm, cache);
verify(msg, times(0)).operateOnRegion(dm, r, startTime);
// A reply is sent even though we do not call operationOnRegion
@@ -130,7 +134,7 @@ public class RemoteOperationMessageTest {
when(tx.isInProgress()).thenReturn(false);
msg.setSender(sender);
- msg.process(dm);
+ msg.doRemoteOperation(dm, cache);
verify(msg, times(1)).sendReply(
eq(sender),
@@ -147,7 +151,7 @@ public class RemoteOperationMessageTest {
when(txMgr.isClosed()).thenReturn(true);
msg.setSender(sender);
- msg.process(dm);
+ msg.doRemoteOperation(dm, cache);
verify(msg, times(0)).operateOnRegion(dm, r, startTime);
// If we do not respond what prevents the sender from waiting forever?
@@ -176,7 +180,7 @@ public class RemoteOperationMessageTest {
when(cache.getCacheClosedException(any())).thenReturn(reasonCacheWasClosed);
msg.setSender(sender);
- msg.process(dm);
+ msg.doRemoteOperation(dm, cache);
verify(msg, times(0)).operateOnRegion(dm, r, startTime);
verify(dm, times(1)).putOutgoing(any());
ArgumentCaptor<ReplyException> captor =
ArgumentCaptor.forClass(ReplyException.class);
@@ -192,7 +196,7 @@ public class RemoteOperationMessageTest {
doNothing().when(msg).checkForSystemFailure();
msg.setSender(sender);
- msg.process(dm);
+ msg.doRemoteOperation(dm, cache);
verify(dm, times(1)).putOutgoing(any());
ArgumentCaptor<ReplyException> captor =
ArgumentCaptor.forClass(ReplyException.class);
verify(msg, times(1)).sendReply(any(), anyInt(), eq(dm), captor.capture(),
eq(r),
@@ -208,7 +212,7 @@ public class RemoteOperationMessageTest {
when(system.isDisconnecting()).thenReturn(false).thenReturn(true);
msg.setSender(sender);
- msg.process(dm);
+ msg.doRemoteOperation(dm, cache);
verify(dm, times(1)).putOutgoing(any());
ArgumentCaptor<ReplyException> captor =
ArgumentCaptor.forClass(ReplyException.class);
verify(msg, times(1)).sendReply(any(), anyInt(), eq(dm), captor.capture(),
eq(r),
@@ -223,7 +227,7 @@ public class RemoteOperationMessageTest {
when(msg.operateOnRegion(dm, r, startTime)).thenThrow(ex);
msg.setSender(sender);
- msg.process(dm);
+ msg.doRemoteOperation(dm, cache);
verify(dm, times(1)).putOutgoing(any());
ArgumentCaptor<ReplyException> captor =
ArgumentCaptor.forClass(ReplyException.class);
verify(msg, times(1)).sendReply(any(), anyInt(), eq(dm), captor.capture(),
eq(r),
@@ -237,7 +241,7 @@ public class RemoteOperationMessageTest {
when(cache.getRegionByPathForProcessing(regionPath)).thenReturn(null);
msg.setSender(sender);
- msg.process(dm);
+ msg.doRemoteOperation(dm, cache);
verify(msg, never()).operateOnRegion(any(), any(), anyLong());
verify(dm, times(1)).putOutgoing(any());
ArgumentCaptor<ReplyException> captor =
ArgumentCaptor.forClass(ReplyException.class);
@@ -253,16 +257,16 @@ public class RemoteOperationMessageTest {
.thenThrow(DistributedSystemDisconnectedException.class);
msg.setSender(sender);
- msg.process(dm);
+ msg.doRemoteOperation(dm, cache);
verify(dm, never()).putOutgoing(any());
}
@Test
- public void processWithOperateOnRegionReturningFalseDoesNotSendReply()
throws Exception {
+ public void processWithOperateOnRegionReturningFalseDoesNotSendReply() {
msg.setOperationOnRegionResult(false);
msg.setSender(sender);
- msg.process(dm);
+ msg.doRemoteOperation(dm, cache);
verify(dm, never()).putOutgoing(any());
}
@@ -273,7 +277,7 @@ public class RemoteOperationMessageTest {
when(msg.operateOnRegion(dm, r, startTime)).thenThrow(theException);
msg.setSender(sender);
- msg.process(dm);
+ msg.doRemoteOperation(dm, cache);
verify(dm, times(1)).putOutgoing(any());
ArgumentCaptor<ReplyException> captor =
ArgumentCaptor.forClass(ReplyException.class);
verify(msg, times(1)).sendReply(any(), anyInt(), eq(dm), captor.capture(),
eq(r),
@@ -288,7 +292,7 @@ public class RemoteOperationMessageTest {
doThrow(new
RuntimeException("SystemFailure")).when(msg).checkForSystemFailure();
msg.setSender(sender);
- assertThatThrownBy(() ->
msg.process(dm)).isInstanceOf(RuntimeException.class)
+ assertThatThrownBy(() -> msg.doRemoteOperation(dm,
cache)).isInstanceOf(RuntimeException.class)
.hasMessage("SystemFailure");
verify(dm, times(1)).putOutgoing(any());
ArgumentCaptor<ReplyException> captor =
ArgumentCaptor.forClass(ReplyException.class);
@@ -302,7 +306,7 @@ public class RemoteOperationMessageTest {
private boolean operationOnRegionResult = true;
- public TestableRemoteOperationMessage(InternalDistributedMember recipient,
String regionPath,
+ TestableRemoteOperationMessage(InternalDistributedMember recipient, String
regionPath,
ReplyProcessor21 processor) {
super(recipient, regionPath, processor);
}
@@ -318,7 +322,7 @@ public class RemoteOperationMessageTest {
return operationOnRegionResult;
}
- public void setOperationOnRegionResult(boolean v) {
+ void setOperationOnRegionResult(boolean v) {
this.operationOnRegionResult = v;
}