This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.12 by this push:
new dff5b2b GEODE-8238: message loss during shutdown in Shutdown Hook
when JVM exits (#5232)
dff5b2b is described below
commit dff5b2b938b73455426db8ad895a84d2a8599538
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Wed Jun 10 11:34:25 2020 -0700
GEODE-8238: message loss during shutdown in Shutdown Hook when JVM exits
(#5232)
Remove invocation of removeEndpoint when a shared/unordered connection
shuts down. Endpoint cleanup is already initiated by DistributionImpl
during membership view installation, so it isn't needed here.
(cherry picked from commit ece3a5a6c045075509a5097054e33a64d4194fae)
Also resolved compilation errors after the cherry-pick.
---
.../geode/internal/tcp/TCPConduitDUnitTest.java | 22 ++++++++++++++++++++++
.../distributed/internal/DistributionImpl.java | 4 ++++
.../distributed/internal/direct/DirectChannel.java | 6 +-----
.../org/apache/geode/internal/tcp/Connection.java | 7 ++++---
.../apache/geode/internal/tcp/ConnectionTable.java | 8 ++++++++
.../org/apache/geode/internal/tcp/TCPConduit.java | 2 +-
6 files changed, 40 insertions(+), 9 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
index 9462c72..698e7fe 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
@@ -16,6 +16,7 @@ package org.apache.geode.internal.tcp;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static
org.apache.geode.test.util.ResourceUtils.createTempFileFromResource;
+import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.io.IOException;
@@ -30,8 +31,10 @@ import org.junit.runners.Parameterized;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.Locator;
+import org.apache.geode.distributed.internal.DistributionImpl;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.SerialAckedMessage;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.test.dunit.DistributedTestCase;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.IgnoredException;
@@ -93,6 +96,25 @@ public class TCPConduitDUnitTest extends DistributedTestCase
{
Thread.sleep(5000);
+ // ensure that the closing of a shared/unordered connection to another
node does not
+ // remove all connections for that node
+ InternalDistributedMember otherMember =
+ (InternalDistributedMember)
system.getAllOtherMembers().iterator().next();
+ DistributionImpl distribution =
+ (DistributionImpl) system.getDistributionManager().getDistribution();
+ final ConnectionTable connectionTable =
+ distribution.getDirectChannel().getConduit().getConTable();
+
+ assertThat(connectionTable.hasReceiversFor(otherMember)).isTrue();
+
+ Connection sharedUnordered = connectionTable.get(otherMember, false,
+ System.currentTimeMillis(), 15000, 0);
+ sharedUnordered.requestClose("for testing");
+ // the sender connection has been closed so we should only have 2 senders
now
+ assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(2);
+ // there should still be receivers for the other member - endpoint not
removed!
+ assertThat(connectionTable.hasReceiversFor(otherMember)).isTrue();
+
try {
await("for message to be sent").until(() -> {
final SerialAckedMessage serialAckedMessage = new SerialAckedMessage();
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
index a89d23f..2fad2e6 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
@@ -794,6 +794,10 @@ public class DistributionImpl implements Distribution {
return result;
}
+ public DirectChannel getDirectChannel() {
+ return directChannel;
+ }
+
/**
* Insert our own MessageReceiver between us and the direct channel, in
order to correctly filter
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index 4799270..bc68f8b 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -267,6 +267,7 @@ public class DirectChannel {
final List cons = new ArrayList(destinations.length);
ConnectExceptions ce = getConnections(mgr, msg, destinations,
orderedMsg, retry, ackTimeout,
ackSDTimeout, cons);
+
if (directReply && msg.getProcessorId() > 0) { // no longer a
direct-reply message?
directReply = false;
}
@@ -690,11 +691,6 @@ public class DirectChannel {
}
}
- public void closeEndpoint(InternalDistributedMember member, String reason) {
- closeEndpoint(member, reason, true);
- }
-
-
/**
* Closes any connections used to communicate with the given jgroupsAddress.
*/
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index a25e40c..ee9892b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -1249,7 +1249,7 @@ public class Connection implements Runnable {
* Invoking this method ensures that the proper synchronization is done.
*/
void requestClose(String reason) {
- close(reason, true, true, false, false);
+ close(reason, true, false, false, false);
}
boolean isClosing() {
@@ -3214,8 +3214,9 @@ public class Connection implements Runnable {
@Override
public String toString() {
- return String.valueOf(remoteAddr) + '@' + uniqueId
- + (remoteVersion != null ? '(' + remoteVersion.toString() + ')' : "");
+ return remoteAddr + "(uid=" + uniqueId + ")"
+ + (remoteVersion != null && remoteVersion != Version.CURRENT
+ ? "(v" + remoteVersion.toString() + ')' : "");
}
/**
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 6f5bf47..c4796a4 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -170,6 +170,14 @@ public class ConnectionTable {
threadWantsOwnResources.set(Boolean.TRUE);
}
+ public static int getNumSenderSharedConnections() {
+ ConnectionTable ct = (ConnectionTable) lastInstance.get();
+ if (ct == null) {
+ return 0;
+ }
+ return (ct.getConduit().getStats().getSendersSU());
+ }
+
/**
* Returns true if calling thread owns its own communication resources.
*/
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index e10b41d..c230266 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -595,7 +595,7 @@ public class TCPConduit implements Runnable {
}
}
- private ConnectionTable getConTable() {
+ ConnectionTable getConTable() {
ConnectionTable result = conTable;
if (result == null) {
stopper.checkCancelInProgress(null);