This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-3948b
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 7536f2e56a73610c1c694b4c9648107405ed735a
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Thu Apr 11 15:30:43 2019 -0700

    GEODE-3948 fixing handling of sotimeout in Message.receive()
    
    Changes made for GEODE-3948 caused the Message.receive() method to block
    indefinitely waiting for a response.  That wasn't the intent - it should
    honor the current setting of sotimeout.
---
 .../geode/internal/cache/tier/sockets/Message.java | 26 ++++++++----
 .../cache/tier/sockets/MessageJUnitTest.java       | 46 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 8 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
index b9c4a03..40d09e0 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
@@ -111,8 +111,6 @@ public class Message {
   @Immutable
   private static final byte[] FALSE = defineFalse();
 
-  private static final int NO_HEADER_READ_TIMEOUT = 0;
-
   private static byte[] defineTrue() {
     try (HeapDataOutputStream hdos = new HeapDataOutputStream(10, null)) {
       BlobHelper.serializeTo(Boolean.TRUE, hdos);
@@ -665,16 +663,22 @@ public class Message {
     cb.clear();
   }
 
-  private void readHeaderAndBody(int headerReadTimeoutMillis) throws 
IOException {
+  private void readHeaderAndBody(boolean setHeaderReadTimeout, int 
headerReadTimeoutMillis)
+      throws IOException {
     clearParts();
     // TODO: for server changes make sure sc is not null as this class also 
used by client
 
-    int timeout = socket.getSoTimeout();
-    try {
+    int oldTimeout = -1;
+    if (setHeaderReadTimeout) {
+      oldTimeout = socket.getSoTimeout();
       socket.setSoTimeout(headerReadTimeoutMillis);
+    }
+    try {
       fetchHeader();
     } finally {
-      socket.setSoTimeout(timeout);
+      if (setHeaderReadTimeout) {
+        socket.setSoTimeout(oldTimeout);
+      }
     }
 
     final ByteBuffer cb = getCommBuffer();
@@ -1133,7 +1137,7 @@ public class Message {
   public void receiveWithHeaderReadTimeout(int timeoutMillis) throws 
IOException {
     if (this.socket != null) {
       synchronized (getCommBuffer()) {
-        readHeaderAndBody(timeoutMillis);
+        readHeaderAndBody(true, timeoutMillis);
       }
     } else {
       throw new IOException("Dead Connection");
@@ -1144,7 +1148,13 @@ public class Message {
    * Populates the state of this {@code Message} with information received via 
its socket
    */
   public void receive() throws IOException {
-    receiveWithHeaderReadTimeout(NO_HEADER_READ_TIMEOUT);
+    if (this.socket != null) {
+      synchronized (getCommBuffer()) {
+        readHeaderAndBody(false, -1);
+      }
+    } else {
+      throw new IOException("Dead Connection");
+    }
   }
 
   public void receive(ServerConnection sc, int maxMessageLength, Semaphore 
dataLimiter,
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
index 7937cda..1eded76 100755
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
@@ -172,4 +172,50 @@ public class MessageJUnitTest {
       }
     }
   }
+
+  @Test(expected = SocketTimeoutException.class)
+  public void 
messageWillTimeoutDuringRecvOnInactiveSocketWithoutExplicitTimeoutSetting()
+      throws Exception {
+    final ServerSocket serverSocket = new ServerSocket();
+    serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0));
+    Thread serverThread = new Thread("acceptor thread") {
+      @Override
+      public void run() {
+        Socket client = null;
+        try {
+          client = serverSocket.accept();
+          Thread.sleep(12000);
+        } catch (InterruptedException e) {
+
+        } catch (IOException e) {
+
+        } finally {
+          if (client != null && !client.isClosed()) {
+            try {
+              client.close();
+            } catch (IOException e) {
+            }
+          }
+        }
+      }
+    };
+    serverThread.setDaemon(true);
+    serverThread.start();
+
+    try {
+      Socket socket = new Socket(serverSocket.getInetAddress(), 
serverSocket.getLocalPort());
+      socket.setSoTimeout(500);
+      MessageStats messageStats = mock(MessageStats.class);
+
+      message.setComms(socket, ByteBuffer.allocate(100), messageStats);
+      message.receive();
+
+    } finally {
+      serverThread.interrupt();
+      if (serverSocket != null && !serverSocket.isClosed()) {
+        serverSocket.close();
+      }
+    }
+  }
+
 }

Reply via email to