This is an automated email from the ASF dual-hosted git repository.
wirebaron pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new d3a9f75 GEODE-5268: fixing race condition in GMSHealthMonitor (#2005)
d3a9f75 is described below
commit d3a9f75157fac632ca0f82aab1426e2f9beab9ab
Author: Brian Rowe <[email protected]>
AuthorDate: Fri Jun 1 09:28:43 2018 -0700
GEODE-5268: fixing race condition in GMSHealthMonitor (#2005)
* GEODE-5268: fixing race condition in GMSHealthMonitor
---
.../membership/gms/fd/GMSHealthMonitor.java | 6 --
.../gms/fd/GMSHealthMonitorJUnitTest.java | 95 ++++++++++++++++++++++
2 files changed, 95 insertions(+), 6 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 9378ec6..13af6d4 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -731,7 +731,6 @@ public class GMSHealthMonitor implements HealthMonitor,
MessageHandler {
if (!ssocket.isClosed()) {
try {
ssocket.close();
- serverSocket = null;
logger.info("GMSHealthMonitor server socket closed.");
} catch (IOException e) {
logger.debug("Unexpected exception", e);
@@ -979,7 +978,6 @@ public class GMSHealthMonitor implements HealthMonitor,
MessageHandler {
if (serverSocket != null && !serverSocket.isClosed()) {
try {
serverSocket.close();
- serverSocket = null;
logger.info("GMSHealthMonitor server socket is closed in
stopServices().");
} catch (IOException e) {
logger.trace("Unexpected exception", e);
@@ -994,10 +992,6 @@ public class GMSHealthMonitor implements HealthMonitor,
MessageHandler {
logger.info("GMSHealthMonitor serverSocketExecutor is "
+ (serverSocketExecutor.isTerminated() ? "terminated" : "not
terminated"));
}
-
- // if (suspectRequestCollectorThread != null) {
- // suspectRequestCollectorThread.shutdown();
- // }
}
/***
diff --git
a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index 468943a..3ed2537 100644
---
a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -44,11 +44,15 @@ import java.io.InputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.awaitility.Awaitility;
import org.jgroups.util.UUID;
@@ -186,6 +190,16 @@ public class GMSHealthMonitorJUnitTest {
assertEquals(1, gmsHealthMonitor.getStats().getHeartbeatsSent());
}
+ @Test
+ public void testHMServiceHandlesShutdownRace() throws IOException {
+ // The health monitor starts a thread to monitor the tcp socket, both that
thread and the
+ // stopServices call will attempt to shut down the socket during a normal
close. This test tries
+ // to create a problematic ordering to make sure we still shutdown
properly.
+ ((GMSHealthMonitorTest) gmsHealthMonitor).useBlockingSocket = true;
+ gmsHealthMonitor.started();
+ gmsHealthMonitor.stop();
+ }
+
/**
* checks who is next neighbor
*/
@@ -799,6 +813,8 @@ public class GMSHealthMonitorJUnitTest {
}
public class GMSHealthMonitorTest extends GMSHealthMonitor {
+ public boolean useBlockingSocket = false;
+
@Override
boolean doTCPCheckMember(InternalDistributedMember suspectMember, int
port) {
if (useGMSHealthMonitorTestClass) {
@@ -809,5 +825,84 @@ public class GMSHealthMonitorJUnitTest {
}
return super.doTCPCheckMember(suspectMember, port);
}
+
+ @Override
+ ServerSocket createServerSocket(InetAddress socketAddress, int[]
portRange) {
+ final ServerSocket serverSocket =
super.createServerSocket(socketAddress, portRange);
+ if (useBlockingSocket) {
+ try {
+ return new TrickySocket(serverSocket);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ return serverSocket;
+ }
+ }
+ }
+
+ public class TrickySocket extends ServerSocket {
+ ServerSocket wrappedSocket;
+ final Lock lock = new ReentrantLock();
+ boolean firstWait = true;
+ final Condition block = lock.newCondition();
+
+ public TrickySocket(ServerSocket wrappee) throws IOException {
+ wrappedSocket = wrappee;
+ }
+
+ @Override
+ public void bind(SocketAddress endpoint) throws IOException {
+ wrappedSocket.bind(endpoint);
+ }
+
+ @Override
+ public void bind(SocketAddress endpoint, int backlog) throws IOException {
+ wrappedSocket.bind(endpoint, backlog);
+ }
+
+ @Override
+ public InetAddress getInetAddress() {
+ return wrappedSocket.getInetAddress();
+ }
+
+ @Override
+ public int getLocalPort() {
+ return wrappedSocket.getLocalPort();
+ }
+
+ @Override
+ public SocketAddress getLocalSocketAddress() {
+ return wrappedSocket.getLocalSocketAddress();
+ }
+
+ @Override
+ public Socket accept() throws IOException {
+ return wrappedSocket.accept();
+ }
+
+ @Override
+ public void close() throws IOException {
+ wrappedSocket.close();
+ lock.lock();
+ block.signal();
+ lock.unlock();
+ }
+
+ @Override
+ public boolean isClosed() {
+ final boolean closed = wrappedSocket.isClosed();
+ lock.lock();
+ if (firstWait) {
+ firstWait = false;
+ try {
+ block.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ lock.unlock();
+ return closed;
+ }
}
}
--
To stop receiving notification emails like this one, please contact
[email protected].