This is an automated email from the ASF dual-hosted git repository.

onichols pushed a commit to branch release/1.10.0
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/release/1.10.0 by this push:
     new 6dde1cd  GEODE-7055: Don't send failure replies from a P2P reader 
thread
6dde1cd is described below

commit 6dde1cdeffcdd169f26242c5a9ffcc2b40374e0b
Author: Dan Smith <[email protected]>
AuthorDate: Wed Aug 7 11:58:59 2019 -0700

    GEODE-7055: Don't send failure replies from a P2P reader thread
    
    We were hitting a deadlock during startup if a P2P reader thread tried
    to send a failure reply - it would block waiting for startup to finish,
    but startup would not finish until the P2P reader thread could read a
    startup response.
    
    Send the failure reply in a separate thread, to make sure we always
    unblock the P2P reader thread to read new messages.
    
    (cherry picked from commit 1438b56bf7ef44e758bb4fc157dfca2cff4e2c99)
---
 .../distributed/DistributedSystemDUnitTest.java    | 63 ++++++++++++++++++++++
 .../org/apache/geode/internal/tcp/Connection.java  | 12 ++---
 2 files changed, 69 insertions(+), 6 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/distributed/DistributedSystemDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/distributed/DistributedSystemDUnitTest.java
index 4a0401c..d2fa9d7 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/distributed/DistributedSystemDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/distributed/DistributedSystemDUnitTest.java
@@ -41,6 +41,9 @@ import static 
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.net.Inet4Address;
 import java.net.Inet6Address;
 import java.net.InetAddress;
@@ -57,14 +60,20 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.CancelException;
+import org.apache.geode.DataSerializable;
 import org.apache.geode.GemFireConfigException;
+import org.apache.geode.SerializationException;
 import org.apache.geode.SystemConnectException;
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.MembershipListener;
 import org.apache.geode.distributed.internal.SerialDistributionMessage;
 import org.apache.geode.distributed.internal.SizeableRunnable;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -321,6 +330,37 @@ public class DistributedSystemDUnitTest extends 
JUnit4DistributedTestCase {
     });
   }
 
+  @Test(timeout = 600_000)
+  public void failedMessageReceivedBeforeStartupShouldNotDeadlock() {
+
+    VM vm0 = VM.getVM(0);
+    VM vm1 = VM.getVM(1);
+
+
+    // Install a membership listener which will send a message to
+    // any new member that joins. The message will fail to deserialize, 
triggering
+    // a failure reply
+    vm0.invoke(() -> {
+      InternalDistributedSystem system = getSystem();
+      DistributionManager dm = system.getDM();
+      dm.addMembershipListener(new MembershipListener() {
+        @Override
+        public void memberJoined(DistributionManager distributionManager,
+            InternalDistributedMember id) {
+          FunctionService.onMember(id).execute(new 
FailDeserializationFunction());
+        }
+      });
+    });
+
+    vm1.invoke(() -> {
+      IgnoredException.addIgnoredException(SerializationException.class);
+
+      // Join the the system. This will trigger the above membership listener. 
If
+      // the failed serialization causes a deadlock, this method will hang
+      getSystem();
+    });
+  }
+
   /**
    * Tests that configuring a distributed system with a cache-xml-file of "" 
does not initialize a
    * cache.
@@ -423,4 +463,27 @@ public class DistributedSystemDUnitTest extends 
JUnit4DistributedTestCase {
     }
   }
 
+  /**
+   * A function that cannot be deserialized, used for failure handling
+   */
+  public static class FailDeserializationFunction
+      implements org.apache.geode.cache.execute.Function,
+      DataSerializable {
+    @Override
+    public void execute(FunctionContext context) {
+
+    }
+
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+      throw new ClassNotFoundException("Fake class not found");
+
+    }
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 9cde2f5..83f2886 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -1944,14 +1944,14 @@ public class Connection implements Runnable {
   }
 
   private void sendFailureReply(int rpId, String exMsg, Throwable ex, boolean 
directAck) {
-    ReplySender dm = null;
+    ReplyException exception = new ReplyException(exMsg, ex);
     if (directAck) {
-      dm = new DirectReplySender(this);
+      ReplySender dm = new DirectReplySender(this);
+      ReplyMessage.send(getRemoteAddress(), rpId, exception, dm);
     } else if (rpId != 0) {
-      dm = this.owner.getDM();
-    }
-    if (dm != null) {
-      ReplyMessage.send(getRemoteAddress(), rpId, new ReplyException(exMsg, 
ex), dm);
+      DistributionManager dm = this.owner.getDM();
+      dm.getWaitingThreadPool()
+          .execute(() -> ReplyMessage.send(getRemoteAddress(), rpId, 
exception, dm));
     }
   }
 

Reply via email to