This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch feature/GEODE-3948b in repository https://gitbox.apache.org/repos/asf/geode.git
commit 7536f2e56a73610c1c694b4c9648107405ed735a Author: Bruce Schuchardt <[email protected]> AuthorDate: Thu Apr 11 15:30:43 2019 -0700 GEODE-3948 fixing handling of sotimeout in Message.receive() Changes made for GEODE-3948 caused the Message.receive() method to block indefinitely waiting for a response. That wasn't the intent - it should honor the current setting of sotimeout. --- .../geode/internal/cache/tier/sockets/Message.java | 26 ++++++++---- .../cache/tier/sockets/MessageJUnitTest.java | 46 ++++++++++++++++++++++ 2 files changed, 64 insertions(+), 8 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java index b9c4a03..40d09e0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java @@ -111,8 +111,6 @@ public class Message { @Immutable private static final byte[] FALSE = defineFalse(); - private static final int NO_HEADER_READ_TIMEOUT = 0; - private static byte[] defineTrue() { try (HeapDataOutputStream hdos = new HeapDataOutputStream(10, null)) { BlobHelper.serializeTo(Boolean.TRUE, hdos); @@ -665,16 +663,22 @@ public class Message { cb.clear(); } - private void readHeaderAndBody(int headerReadTimeoutMillis) throws IOException { + private void readHeaderAndBody(boolean setHeaderReadTimeout, int headerReadTimeoutMillis) + throws IOException { clearParts(); // TODO: for server changes make sure sc is not null as this class also used by client - int timeout = socket.getSoTimeout(); - try { + int oldTimeout = -1; + if (setHeaderReadTimeout) { + oldTimeout = socket.getSoTimeout(); socket.setSoTimeout(headerReadTimeoutMillis); + } + try { fetchHeader(); } finally { - socket.setSoTimeout(timeout); + if (setHeaderReadTimeout) { + socket.setSoTimeout(oldTimeout); + } } final ByteBuffer cb = getCommBuffer(); @@ -1133,7 +1137,7 @@ public class Message { public void receiveWithHeaderReadTimeout(int timeoutMillis) throws IOException { if (this.socket != null) { synchronized (getCommBuffer()) { - readHeaderAndBody(timeoutMillis); + readHeaderAndBody(true, timeoutMillis); } } else { throw new IOException("Dead Connection"); @@ -1144,7 +1148,13 @@ public class Message { * Populates the state of this {@code Message} with information received via its socket */ public void receive() throws IOException { - receiveWithHeaderReadTimeout(NO_HEADER_READ_TIMEOUT); + if (this.socket != null) { + synchronized (getCommBuffer()) { + readHeaderAndBody(false, -1); + } + } else { + throw new IOException("Dead Connection"); + } } public void receive(ServerConnection sc, int maxMessageLength, Semaphore dataLimiter, diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java index 7937cda..1eded76 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java @@ -172,4 +172,50 @@ public class MessageJUnitTest { } } } + + @Test(expected = SocketTimeoutException.class) + public void messageWillTimeoutDuringRecvOnInactiveSocketWithoutExplicitTimeoutSetting() + throws Exception { + final ServerSocket serverSocket = new ServerSocket(); + serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0)); + Thread serverThread = new Thread("acceptor thread") { + @Override + public void run() { + Socket client = null; + try { + client = serverSocket.accept(); + Thread.sleep(12000); + } catch (InterruptedException e) { + + } catch (IOException e) { + + } finally { + if (client != null && !client.isClosed()) { + try { + client.close(); + } catch (IOException e) { + } + } + } + } + }; + serverThread.setDaemon(true); + serverThread.start(); + + try { + Socket socket = new Socket(serverSocket.getInetAddress(), serverSocket.getLocalPort()); + socket.setSoTimeout(500); + MessageStats messageStats = mock(MessageStats.class); + + message.setComms(socket, ByteBuffer.allocate(100), messageStats); + message.receive(); + + } finally { + serverThread.interrupt(); + if (serverSocket != null && !serverSocket.isClosed()) { + serverSocket.close(); + } + } + } + }
