This is an automated email from the ASF dual-hosted git repository.
onichols pushed a commit to branch release/1.10.0
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/release/1.10.0 by this push:
new 6dde1cd GEODE-7055: Don't send failure replies from a P2P reader
thread
6dde1cd is described below
commit 6dde1cdeffcdd169f26242c5a9ffcc2b40374e0b
Author: Dan Smith <[email protected]>
AuthorDate: Wed Aug 7 11:58:59 2019 -0700
GEODE-7055: Don't send failure replies from a P2P reader thread
We were hitting a deadlock during startup if a P2P reader thread tried
to send a failure reply - it would block waiting for startup to finish,
but startup would not finish until the P2P reader thread could read a
startup response.
Send the failure reply in a separate thread, to make sure we always
unblock the P2P reader thread to read new messages.
(cherry picked from commit 1438b56bf7ef44e758bb4fc157dfca2cff4e2c99)
---
.../distributed/DistributedSystemDUnitTest.java | 63 ++++++++++++++++++++++
.../org/apache/geode/internal/tcp/Connection.java | 12 ++---
2 files changed, 69 insertions(+), 6 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/distributed/DistributedSystemDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/distributed/DistributedSystemDUnitTest.java
index 4a0401c..d2fa9d7 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/distributed/DistributedSystemDUnitTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/distributed/DistributedSystemDUnitTest.java
@@ -41,6 +41,9 @@ import static
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
@@ -57,14 +60,20 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.CancelException;
+import org.apache.geode.DataSerializable;
import org.apache.geode.GemFireConfigException;
+import org.apache.geode.SerializationException;
import org.apache.geode.SystemConnectException;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.SerialDistributionMessage;
import org.apache.geode.distributed.internal.SizeableRunnable;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -321,6 +330,37 @@ public class DistributedSystemDUnitTest extends
JUnit4DistributedTestCase {
});
}
+ @Test(timeout = 600_000)
+ public void failedMessageReceivedBeforeStartupShouldNotDeadlock() {
+
+ VM vm0 = VM.getVM(0);
+ VM vm1 = VM.getVM(1);
+
+
+ // Install a membership listener which will send a message to
+ // any new member that joins. The message will fail to deserialize,
triggering
+ // a failure reply
+ vm0.invoke(() -> {
+ InternalDistributedSystem system = getSystem();
+ DistributionManager dm = system.getDM();
+ dm.addMembershipListener(new MembershipListener() {
+ @Override
+ public void memberJoined(DistributionManager distributionManager,
+ InternalDistributedMember id) {
+ FunctionService.onMember(id).execute(new
FailDeserializationFunction());
+ }
+ });
+ });
+
+ vm1.invoke(() -> {
+ IgnoredException.addIgnoredException(SerializationException.class);
+
+ // Join the the system. This will trigger the above membership listener.
If
+ // the failed serialization causes a deadlock, this method will hang
+ getSystem();
+ });
+ }
+
/**
* Tests that configuring a distributed system with a cache-xml-file of ""
does not initialize a
* cache.
@@ -423,4 +463,27 @@ public class DistributedSystemDUnitTest extends
JUnit4DistributedTestCase {
}
}
+ /**
+ * A function that cannot be deserialized, used for failure handling
+ */
+ public static class FailDeserializationFunction
+ implements org.apache.geode.cache.execute.Function,
+ DataSerializable {
+ @Override
+ public void execute(FunctionContext context) {
+
+ }
+
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException,
ClassNotFoundException {
+ throw new ClassNotFoundException("Fake class not found");
+
+ }
+ }
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 9cde2f5..83f2886 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -1944,14 +1944,14 @@ public class Connection implements Runnable {
}
private void sendFailureReply(int rpId, String exMsg, Throwable ex, boolean
directAck) {
- ReplySender dm = null;
+ ReplyException exception = new ReplyException(exMsg, ex);
if (directAck) {
- dm = new DirectReplySender(this);
+ ReplySender dm = new DirectReplySender(this);
+ ReplyMessage.send(getRemoteAddress(), rpId, exception, dm);
} else if (rpId != 0) {
- dm = this.owner.getDM();
- }
- if (dm != null) {
- ReplyMessage.send(getRemoteAddress(), rpId, new ReplyException(exMsg,
ex), dm);
+ DistributionManager dm = this.owner.getDM();
+ dm.getWaitingThreadPool()
+ .execute(() -> ReplyMessage.send(getRemoteAddress(), rpId,
exception, dm));
}
}