This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new c841359 GEODE-7727: modify sender thread to detect relese of
connection (#4751)
c841359 is described below
commit c8413592e5573f675c538c63ef9ee9f97a349e73
Author: Mario Ivanac <[email protected]>
AuthorDate: Tue Mar 10 06:47:09 2020 +0100
GEODE-7727: modify sender thread to detect relese of connection (#4751)
* GEODE-7727: modify sender thread to detect relese of connection
* GEODE-7727: Update solution only for shared connections
* GEODE-7727: added test
* GEODE-7727: update ater comments
* GEODE-7727: update test
* GEODE-7727: fix for async write hanging
* GEODE-7727: Test of region operations in the face of closed connections
Adding a test for what happens to region operations when a connection is
closed
out from under the system. This test hangs without the changes to let the
reader thread keep running.
Fix to test
* GEODE-7727: Preventing a double release of the input buffer
The releaseInputBuffer method was not thread safe. If it is called
concurrently, it will end up being released twice, which will add the
buffer to
to the buffer pool twice. Later, this could result in two threads using the
same buffer, resulting in corruption of the buffer.
With the changes for GEODE-7727, we made it likely that releaseInputBuffer
would be called concurrently. If a member departs, one thread will call
Connection.close. Connection.close will close the socket and call
releaseInputBuffer. However, closing the socket will wake up the reader
thread,
which will also call releaseInputBuffer concurrently.
Making releaseInputBuffer thread safe by introducing a lock.
* GEODE-7727: update after merge
* GEODE-7727: update test name
Co-authored-by: Dan Smith <[email protected]>
---
.../geode/internal/tcp/CloseConnectionTest.java | 76 ++++++++++++++++++++++
.../geode/internal/tcp/TCPConduitDUnitTest.java | 5 +-
...erStartupWhenAsyncDistributionTimeoutIsSet.java | 71 ++++++++++++++++++++
...butedSystemMXBeanWithAlertsDistributedTest.java | 1 +
.../distributed/internal/DistributionImpl.java | 4 ++
.../org/apache/geode/internal/tcp/Connection.java | 38 ++++++++---
.../apache/geode/internal/tcp/ConnectionTable.java | 14 +++-
.../org/apache/geode/internal/tcp/TCPConduit.java | 2 +-
8 files changed, 199 insertions(+), 12 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
new file mode 100644
index 0000000..154e908
--- /dev/null
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionImpl;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.CacheTestCase;
+
+public class CloseConnectionTest extends CacheTestCase {
+
+ @Test(timeout = 60_000)
+ public void sharedSenderShouldRecoverFromClosedSocket() {
+ VM vm0 = VM.getVM(0);
+ VM vm1 = VM.getVM(1);
+
+ // Create a region in each member. VM0 has a proxy region, so state must
be in VM1
+ vm0.invoke(() -> {
+
getCache().createRegionFactory(RegionShortcut.REPLICATE_PROXY).create("region");
+ });
+ vm1.invoke(() -> {
+
getCache().createRegionFactory(RegionShortcut.REPLICATE).create("region");
+ });
+
+
+ // Force VM1 to close it's connections.
+ vm1.invoke(() -> {
+ ConnectionTable conTable = getConnectionTable();
+ assertThat(conTable.getNumberOfReceivers()).isEqualTo(2);
+ conTable.closeReceivers(false);
+ assertThat(conTable.getNumberOfReceivers()).isEqualTo(0);
+ });
+
+ // See if VM0 noticed the closed connections. Try to do a couple of region
+ // operations
+ vm0.invoke(() -> {
+ Region<Object, Object> region = getCache().getRegion("region");
+ region.put("1", "1");
+
+ assertThat(region.get("1")).isEqualTo("1");
+ });
+
+ // Make sure connections were reestablished
+ vm1.invoke(() -> {
+ ConnectionTable conTable = getConnectionTable();
+ assertThat(conTable.getNumberOfReceivers()).isEqualTo(2);
+ });
+ }
+
+ private ConnectionTable getConnectionTable() {
+ ClusterDistributionManager cdm =
+ (ClusterDistributionManager) getSystem().getDistributionManager();
+ DistributionImpl distribution = (DistributionImpl) cdm.getDistribution();
+ return distribution.getDirectChannel().getConduit().getConTable();
+ }
+
+
+}
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
index 9462c72..9ffb76c 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
@@ -16,6 +16,7 @@ package org.apache.geode.internal.tcp;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static
org.apache.geode.test.util.ResourceUtils.createTempFileFromResource;
+import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.io.IOException;
@@ -91,7 +92,9 @@ public class TCPConduitDUnitTest extends DistributedTestCase {
vm2.invoke(() -> startServer(properties));
vm3.invoke(() -> startServer(properties));
- Thread.sleep(5000);
+ await().untilAsserted(() -> {
+ assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(3);
+ });
try {
await("for message to be sent").until(() -> {
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TestServerStartupWhenAsyncDistributionTimeoutIsSet.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TestServerStartupWhenAsyncDistributionTimeoutIsSet.java
new file mode 100644
index 0000000..be57e26
--- /dev/null
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TestServerStartupWhenAsyncDistributionTimeoutIsSet.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.SerialAckedMessage;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class TestServerStartupWhenAsyncDistributionTimeoutIsSet implements
Serializable {
+ int serversToStart = 3;
+
+ protected static InternalDistributedSystem system =
+ InternalDistributedSystem.getConnectedInstance();
+
+ @Rule
+ public ClusterStartupRule cluster = new ClusterStartupRule(serversToStart +
1);
+
+ MemberVM locator;
+ MemberVM server1;
+ MemberVM server2;
+ MemberVM server3;
+
+ @Before
+ public void setUp() throws Exception {
+ locator = cluster.startLocatorVM(0);
+ }
+
+ private MemberVM startServer(final int vmIndex) {
+ return cluster.startServerVM(
+ vmIndex, s -> s.withConnectionToLocator(locator.getPort())
+ .withProperty("async-distribution-timeout", "5"));
+ }
+
+ @Test
+ public void testServerStartupDoesNotHangWhenAsyncDistributionTimeoutIsSet() {
+ server1 = startServer(1);
+ server2 = startServer(2);
+ server3 = startServer(3);
+ locator.invoke(() -> await().untilAsserted(() -> {
+ assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(3);
+ }));
+
+ locator.invoke(() -> await("for message to be sent").until(() -> {
+ final SerialAckedMessage serialAckedMessage = new SerialAckedMessage();
+ serialAckedMessage.send(system.getAllOtherMembers(), false);
+ return true;
+ }));
+ }
+}
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithAlertsDistributedTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithAlertsDistributedTest.java
index c549fe4..8aa3ada 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithAlertsDistributedTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithAlertsDistributedTest.java
@@ -137,6 +137,7 @@ public class
DistributedSystemMXBeanWithAlertsDistributedTest implements Seriali
memberVM3 = getVM(3);
managerMember = managerVM.invoke(() -> createManager());
+ IgnoredException.addIgnoredException("Cannot form connection to alert
listener");
for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
memberVM.invoke(() -> {
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
index a89d23f..2fad2e6 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
@@ -794,6 +794,10 @@ public class DistributionImpl implements Distribution {
return result;
}
+ public DirectChannel getDirectChannel() {
+ return directChannel;
+ }
+
/**
* Insert our own MessageReceiver between us and the direct channel, in
order to correctly filter
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index ce83ad4..8c8a2fc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -311,6 +311,9 @@ public class Connection implements Runnable {
/** the buffer used for message receipt */
private ByteBuffer inputBuffer;
+ /** Lock used to protect the input buffer */
+ public final Object inputBufferLock = new Object();
+
/** the length of the next message to be dispatched */
private int messageLength;
@@ -339,6 +342,7 @@ public class Connection implements Runnable {
private boolean directAck;
+ private boolean asyncMode;
/** is this connection used for serial message delivery? */
boolean preserveOrder;
@@ -483,6 +487,7 @@ public class Connection implements Runnable {
handshakeRead = false;
handshakeCancelled = false;
connected = true;
+ asyncMode = false;
try {
socket.setTcpNoDelay(true);
@@ -1121,6 +1126,7 @@ public class Connection implements Runnable {
handshakeRead = false;
handshakeCancelled = false;
connected = true;
+ asyncMode = false;
uniqueId = ID_COUNTER.getAndIncrement();
@@ -1454,6 +1460,10 @@ public class Connection implements Runnable {
}
asyncClose(false);
owner.removeAndCloseThreadOwnedSockets();
+ } else {
+ if (sharedResource && !asyncMode) {
+ asyncClose(false);
+ }
}
releaseInputBuffer();
@@ -1468,10 +1478,12 @@ public class Connection implements Runnable {
}
private void releaseInputBuffer() {
- ByteBuffer tmp = inputBuffer;
- if (tmp != null) {
- inputBuffer = null;
- getBufferPool().releaseReceiveBuffer(tmp);
+ synchronized (inputBufferLock) {
+ ByteBuffer tmp = inputBuffer;
+ if (tmp != null) {
+ inputBuffer = null;
+ getBufferPool().releaseReceiveBuffer(tmp);
+ }
}
}
@@ -1593,10 +1605,9 @@ public class Connection implements Runnable {
}
return;
}
-
processInputBuffer();
- if (!isReceiver && (handshakeRead || handshakeCancelled)) {
+ if (!isHandShakeReader && !isReceiver && (handshakeRead ||
handshakeCancelled)) {
if (logger.isDebugEnabled()) {
if (handshakeRead) {
logger.debug("handshake has been read {}", this);
@@ -1605,8 +1616,13 @@ public class Connection implements Runnable {
}
}
isHandShakeReader = true;
- // Once we have read the handshake the reader can go away
- break;
+
+ // Once we have read the handshake for unshared connections, the
reader can skip
+ // processing messages
+ if (!sharedResource || asyncMode) {
+ break;
+ }
+
}
} catch (CancelException e) {
if (logger.isDebugEnabled()) {
@@ -1660,7 +1676,7 @@ public class Connection implements Runnable {
}
}
} finally {
- if (!isHandShakeReader) {
+ if (!isHandShakeReader || (sharedResource && !asyncMode)) {
synchronized (stateLock) {
connectionState = STATE_IDLE;
}
@@ -3069,6 +3085,10 @@ public class Connection implements Runnable {
remoteVersion = Version.readVersion(dis, true);
ioFilter.doneReading(peerDataBuffer);
notifyHandshakeWaiter(true);
+ if (preserveOrder && asyncDistributionTimeout != 0) {
+ asyncMode = true;
+ }
+
return;
default:
String err =
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 6f5bf47..0c098d1 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -37,6 +37,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.SystemFailure;
import org.apache.geode.alerting.internal.spi.AlertingAction;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
@@ -707,7 +708,7 @@ public class ConnectionTable {
*
* @param beingSick a test hook to simulate a sick process
*/
- private void closeReceivers(boolean beingSick) {
+ void closeReceivers(boolean beingSick) {
synchronized (receivers) {
for (Iterator it = receivers.iterator(); it.hasNext();) {
Connection con = (Connection) it.next();
@@ -917,6 +918,17 @@ public class ConnectionTable {
}
}
+ @VisibleForTesting
+ public static int getNumSenderSharedConnections() {
+ ConnectionTable ct = (ConnectionTable) lastInstance.get();
+ if (ct == null) {
+ return 0;
+ }
+ return (ct.getConduit().getStats().getSendersSU());
+ }
+
+
+
/**
* Clears lastInstance. Does not yet close underlying sockets, but probably
not strictly
* necessary.
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 6130420..6f5edda 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -613,7 +613,7 @@ public class TCPConduit implements Runnable {
}
}
- private ConnectionTable getConTable() {
+ ConnectionTable getConTable() {
ConnectionTable result = conTable;
if (result == null) {
stopper.checkCancelInProgress(null);