This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ae8cb0  [FLINK-24156][network] Guard against erroneous 
SocketTimeoutExceptions
5ae8cb0 is described below

commit 5ae8cb0503449b07f76d0ab621c3e81734496b26
Author: Ryan Scudellari <[email protected]>
AuthorDate: Mon Sep 20 04:48:13 2021 -0400

    [FLINK-24156][network] Guard against erroneous SocketTimeoutExceptions
    
    On JDK11 JDK-8237858 can cause SocketTimeoutExceptions to be thrown during
    ServerSocket.accept() calls, even when configured with an infinite timeout.
    This change works around this issue by catching these erroneous exceptions
    and retrying indefinitely.
---
 .../main/java/org/apache/flink/util/NetUtils.java  | 33 +++++++++++
 .../java/org/apache/flink/util/NetUtilsTest.java   | 65 ++++++++++++++++++++++
 .../test/socket/SocketWindowWordCountITCase.java   |  3 +-
 .../org/apache/flink/runtime/blob/BlobServer.java  |  3 +-
 .../runtime/blob/TestingFailingBlobServer.java     |  6 +-
 .../flink/runtime/net/ConnectionUtilsTest.java     |  4 +-
 .../apache/flink/runtime/rest/RestClientTest.java  |  9 ++-
 .../api/operators/collect/CollectSinkFunction.java |  3 +-
 .../experimental/SocketStreamIterator.java         |  3 +-
 .../api/functions/sink/SocketClientSinkTest.java   | 11 ++--
 .../source/SocketTextStreamFunctionTest.java       | 29 +++++-----
 .../CollectSinkOperatorCoordinatorTest.java        |  3 +-
 .../org/apache/flink/networking/EchoServer.java    |  2 +
 13 files changed, 145 insertions(+), 29 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index a079fd6..89b11df 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -32,6 +32,9 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketOptions;
+import java.net.SocketTimeoutException;
 import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.Arrays;
@@ -116,6 +119,36 @@ public class NetUtils {
         }
     }
 
+    /**
+     * Calls {@link ServerSocket#accept()} on the provided server socket, 
suppressing any thrown
+     * {@link SocketTimeoutException}s. This is a workaround for the 
underlying JDK-8237858 bug in
+     * JDK 11 that can cause errant SocketTimeoutExceptions to be thrown at 
unexpected times.
+     *
+     * <p>This method expects the provided ServerSocket has no timeout set 
(SO_TIMEOUT of 0),
+     * indicating an infinite timeout. It will suppress all 
SocketTimeoutExceptions, even if a
+     * ServerSocket with a non-zero timeout is passed in.
+     *
+     * @param serverSocket a ServerSocket with {@link SocketOptions#SO_TIMEOUT 
SO_TIMEOUT} set to 0;
+     *     if SO_TIMEOUT is greater than 0, then this method will suppress 
SocketTimeoutException;
+     *     must not be null; SO_TIMEOUT option must be set to 0
+     * @return the new Socket
+     * @exception IOException see {@link ServerSocket#accept()}
+     * @see <a 
href="https://bugs.openjdk.java.net/browse/JDK-8237858";>JDK-8237858</a>
+     */
+    public static Socket acceptWithoutTimeout(ServerSocket serverSocket) 
throws IOException {
+        Preconditions.checkArgument(
+                serverSocket.getSoTimeout() == 0, "serverSocket SO_TIMEOUT 
option must be 0");
+        while (true) {
+            try {
+                return serverSocket.accept();
+            } catch (SocketTimeoutException exception) {
+                // This should be impossible given that the socket timeout is 
set to zero
+                // which indicates an infinite timeout. This is due to the 
underlying JDK-8237858
+                // bug. We retry the accept call indefinitely to replicate the 
expected behavior.
+            }
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  Lookup of to free ports
     // ------------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
index f54d715..5410694 100644
--- a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
@@ -21,8 +21,12 @@ package org.apache.flink.util;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.HashSet;
@@ -50,6 +54,67 @@ public class NetUtilsTest extends TestLogger {
     }
 
     @Test
+    public void testAcceptWithoutTimeoutSuppressesTimeoutException() throws 
IOException {
+        // Validates that acceptWithoutTimeout suppresses all 
SocketTimeoutExceptions
+        Socket expected = new Socket();
+        ServerSocket serverSocket =
+                new ServerSocket() {
+                    private int count = 0;
+
+                    @Override
+                    public Socket accept() throws IOException {
+                        if (count < 2) {
+                            count++;
+                            throw new SocketTimeoutException();
+                        }
+
+                        return expected;
+                    }
+                };
+
+        assertEquals(expected, NetUtils.acceptWithoutTimeout(serverSocket));
+    }
+
+    @Test
+    public void testAcceptWithoutTimeoutDefaultTimeout() throws IOException {
+        // Default timeout (should be zero)
+        final Socket expected = new Socket();
+        try (final ServerSocket serverSocket =
+                new ServerSocket(0) {
+                    @Override
+                    public Socket accept() {
+                        return expected;
+                    }
+                }) {
+            assertEquals(expected, 
NetUtils.acceptWithoutTimeout(serverSocket));
+        }
+    }
+
+    @Test
+    public void testAcceptWithoutTimeoutZeroTimeout() throws IOException {
+        // Explicitly sets a timeout of zero
+        final Socket expected = new Socket();
+        try (final ServerSocket serverSocket =
+                new ServerSocket(0) {
+                    @Override
+                    public Socket accept() {
+                        return expected;
+                    }
+                }) {
+            serverSocket.setSoTimeout(0);
+            assertEquals(expected, 
NetUtils.acceptWithoutTimeout(serverSocket));
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testAcceptWithoutTimeoutRejectsSocketWithSoTimeout() throws 
IOException {
+        try (final ServerSocket serverSocket = new ServerSocket(0)) {
+            serverSocket.setSoTimeout(5);
+            NetUtils.acceptWithoutTimeout(serverSocket);
+        }
+    }
+
+    @Test
     public void testIPv4toURL() {
         try {
             final String addressString = "192.168.0.1";
diff --git 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
index 8bc7ba6..7f27378 100644
--- 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
+++ 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
@@ -21,6 +21,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.examples.socket.SocketWindowWordCount;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.NetUtils;
 
 import org.junit.Test;
 
@@ -138,7 +139,7 @@ public class SocketWindowWordCountITCase extends 
AbstractTestBase {
         @Override
         public void run() {
             try {
-                try (Socket socket = serverSocket.accept();
+                try (Socket socket = 
NetUtils.acceptWithoutTimeout(serverSocket);
                         PrintWriter writer = new 
PrintWriter(socket.getOutputStream(), true)) {
 
                     writer.println(WordCountData.TEXT);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 71d31a3..93aeab4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -264,7 +264,8 @@ public class BlobServer extends Thread
     public void run() {
         try {
             while (!this.shutdownRequested.get()) {
-                BlobServerConnection conn = new 
BlobServerConnection(serverSocket.accept(), this);
+                BlobServerConnection conn =
+                        new 
BlobServerConnection(NetUtils.acceptWithoutTimeout(serverSocket), this);
                 try {
                     synchronized (activeConnections) {
                         while (activeConnections.size() >= maxConnections) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
index 18c2725..dfeb125 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.NetUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -53,7 +54,8 @@ public class TestingFailingBlobServer extends BlobServer {
         // we do properly the first operation (PUT)
         try {
             for (int num = 0; num < numAccept && !isShutdown(); num++) {
-                new BlobServerConnection(getServerSocket().accept(), 
this).start();
+                new 
BlobServerConnection(NetUtils.acceptWithoutTimeout(getServerSocket()), this)
+                        .start();
             }
         } catch (Throwable t) {
             t.printStackTrace();
@@ -63,7 +65,7 @@ public class TestingFailingBlobServer extends BlobServer {
         for (int num = 0; num < numFailures && !isShutdown(); num++) {
             Socket socket = null;
             try {
-                socket = getServerSocket().accept();
+                socket = NetUtils.acceptWithoutTimeout(getServerSocket());
                 InputStream is = socket.getInputStream();
                 OutputStream os = socket.getOutputStream();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
index e38b8e4..22655ec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.net;
 
+import org.apache.flink.util.NetUtils;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mockito;
@@ -78,7 +80,7 @@ public class ConnectionUtilsTest {
                                 @Override
                                 public void run() {
                                     try {
-                                        socket.accept();
+                                        NetUtils.acceptWithoutTimeout(socket);
                                     } catch (IOException e) {
                                         // ignore
                                     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index 3b74857..06aad7e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.testutils.TestingUtils;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.function.CheckedSupplier;
@@ -110,7 +111,9 @@ public class RestClientTest extends TestLogger {
 
             // start server
             final CompletableFuture<Socket> socketCompletableFuture =
-                    
CompletableFuture.supplyAsync(CheckedSupplier.unchecked(serverSocket::accept));
+                    CompletableFuture.supplyAsync(
+                            CheckedSupplier.unchecked(
+                                    () -> 
NetUtils.acceptWithoutTimeout(serverSocket)));
 
             final CompletableFuture<EmptyResponseBody> responseFuture =
                     restClient.sendRequest(
@@ -162,7 +165,9 @@ public class RestClientTest extends TestLogger {
 
             // start server
             final CompletableFuture<Socket> socketCompletableFuture =
-                    
CompletableFuture.supplyAsync(CheckedSupplier.unchecked(serverSocket::accept));
+                    CompletableFuture.supplyAsync(
+                            CheckedSupplier.unchecked(
+                                    () -> 
NetUtils.acceptWithoutTimeout(serverSocket)));
 
             final CompletableFuture<EmptyResponseBody> responseFuture =
                     restClient.sendRequest(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
index fe810d5..042bbb8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -382,7 +383,7 @@ public class CollectSinkFunction<IN> extends 
RichSinkFunction<IN>
                 try {
                     if (connection == null) {
                         // waiting for coordinator to connect
-                        connection = serverSocket.accept();
+                        connection = 
NetUtils.acceptWithoutTimeout(serverSocket);
                         inStream = new 
DataInputViewStreamWrapper(this.connection.getInputStream());
                         outStream =
                                 new 
DataOutputViewStreamWrapper(this.connection.getOutputStream());
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
index 5df5f69..69a72c3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.experimental;
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.NetUtils;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -171,7 +172,7 @@ public class SocketStreamIterator<T> implements Iterator<T> 
{
     private T readNextFromStream() throws Exception {
         try {
             if (inStream == null) {
-                connectedSocket = socket.accept();
+                connectedSocket = NetUtils.acceptWithoutTimeout(socket);
                 inStream = new 
DataInputViewStreamWrapper(connectedSocket.getInputStream());
             }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
index a8287a5..28e3e0d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.sink;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.commons.io.IOUtils;
@@ -88,7 +89,7 @@ public class SocketClientSinkTest extends TestLogger {
 
         sinkRunner.start();
 
-        Socket sk = server.accept();
+        Socket sk = NetUtils.acceptWithoutTimeout(server);
         BufferedReader rdr = new BufferedReader(new 
InputStreamReader(sk.getInputStream()));
 
         String value = rdr.readLine();
@@ -132,7 +133,7 @@ public class SocketClientSinkTest extends TestLogger {
 
         sinkRunner.start();
 
-        Socket sk = server.accept();
+        Socket sk = NetUtils.acceptWithoutTimeout(server);
         BufferedReader rdr = new BufferedReader(new 
InputStreamReader(sk.getInputStream()));
         String value = rdr.readLine();
 
@@ -163,7 +164,7 @@ public class SocketClientSinkTest extends TestLogger {
                         @Override
                         public void run() {
                             try {
-                                Socket sk = server.accept();
+                                Socket sk = 
NetUtils.acceptWithoutTimeout(server);
                                 sk.close();
                             } catch (Throwable t) {
                                 error.set(t);
@@ -221,7 +222,7 @@ public class SocketClientSinkTest extends TestLogger {
                     new Callable<Void>() {
                         @Override
                         public Void call() throws Exception {
-                            Socket socket = serverSocket[0].accept();
+                            Socket socket = 
NetUtils.acceptWithoutTimeout(serverSocket[0]);
 
                             BufferedReader reader =
                                     new BufferedReader(
@@ -296,7 +297,7 @@ public class SocketClientSinkTest extends TestLogger {
                 throw new AssumptionViolatedException(
                         "Could not bind server to previous port.", be);
             }
-            Socket socket = serverSocket[0].accept();
+            Socket socket = NetUtils.acceptWithoutTimeout(serverSocket[0]);
 
             BufferedReader reader =
                     new BufferedReader(new 
InputStreamReader(socket.getInputStream()));
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
index 6b682c1..1354384 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.source;
 
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.NetUtils;
 
 import org.apache.commons.io.IOUtils;
 import org.junit.Test;
@@ -50,7 +51,7 @@ public class SocketTextStreamFunctionTest {
             SocketSourceThread runner = new SocketSourceThread(source, 
"test1", "check");
             runner.start();
 
-            channel = server.accept();
+            channel = NetUtils.acceptWithoutTimeout(server);
             OutputStreamWriter writer = new 
OutputStreamWriter(channel.getOutputStream());
 
             writer.write("test1\n");
@@ -84,7 +85,7 @@ public class SocketTextStreamFunctionTest {
             SocketSourceThread runner = new SocketSourceThread(source);
             runner.start();
 
-            channel = server.accept();
+            channel = NetUtils.acceptWithoutTimeout(server);
             channel.close();
 
             try {
@@ -113,22 +114,22 @@ public class SocketTextStreamFunctionTest {
             runner.start();
 
             // first connection: nothing
-            channel = server.accept();
+            channel = NetUtils.acceptWithoutTimeout(server);
             channel.close();
 
             // second connection: first string
-            channel = server.accept();
+            channel = NetUtils.acceptWithoutTimeout(server);
             OutputStreamWriter writer = new 
OutputStreamWriter(channel.getOutputStream());
             writer.write("test1\n");
             writer.close();
             channel.close();
 
             // third connection: nothing
-            channel = server.accept();
+            channel = NetUtils.acceptWithoutTimeout(server);
             channel.close();
 
             // forth connection: second string
-            channel = server.accept();
+            channel = NetUtils.acceptWithoutTimeout(server);
             writer = new OutputStreamWriter(channel.getOutputStream());
             writer.write("check\n");
             writer.flush();
@@ -157,22 +158,22 @@ public class SocketTextStreamFunctionTest {
             runner.start();
 
             // first connection: nothing
-            channel = server.accept();
+            channel = NetUtils.acceptWithoutTimeout(server);
             channel.close();
 
             // second connection: first string
-            channel = server.accept();
+            channel = NetUtils.acceptWithoutTimeout(server);
             OutputStreamWriter writer = new 
OutputStreamWriter(channel.getOutputStream());
             writer.write("test1\n");
             writer.close();
             channel.close();
 
             // third connection: nothing
-            channel = server.accept();
+            channel = NetUtils.acceptWithoutTimeout(server);
             channel.close();
 
             // forth connection: second string
-            channel = server.accept();
+            channel = NetUtils.acceptWithoutTimeout(server);
             writer = new OutputStreamWriter(channel.getOutputStream());
             writer.write("check\n");
             writer.flush();
@@ -201,22 +202,22 @@ public class SocketTextStreamFunctionTest {
             runner.start();
 
             // first connection: nothing
-            channel = server.accept();
+            channel = NetUtils.acceptWithoutTimeout(server);
             channel.close();
 
             // second connection: first string
-            channel = server.accept();
+            channel = NetUtils.acceptWithoutTimeout(server);
             OutputStreamWriter writer = new 
OutputStreamWriter(channel.getOutputStream());
             writer.write("te");
             writer.close();
             channel.close();
 
             // third connection: nothing
-            channel = server.accept();
+            channel = NetUtils.acceptWithoutTimeout(server);
             channel.close();
 
             // forth connection: second string
-            channel = server.accept();
+            channel = NetUtils.acceptWithoutTimeout(server);
             writer = new OutputStreamWriter(channel.getOutputStream());
             writer.write("st1\n");
             writer.write("check1\n");
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
index 8f0cc5d..2a16512 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.streaming.api.operators.collect.utils.CollectTestUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.NetUtils;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -165,7 +166,7 @@ public class CollectSinkOperatorCoordinatorTest {
             try {
                 while (running) {
                     if (socket == null) {
-                        socket = server.accept();
+                        socket = NetUtils.acceptWithoutTimeout(server);
                         inStream = new 
DataInputViewStreamWrapper(socket.getInputStream());
                         outStream = new 
DataOutputViewStreamWrapper(socket.getOutputStream());
                     }
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
 
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
index 00a85f8..8835a66 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
@@ -51,6 +51,8 @@ public class EchoServer extends Thread implements 
AutoCloseable {
     public void run() {
         while (!close) {
             try {
+                // We are NOT using NetUtils.acceptWithoutTimeout here as this 
ServerSocket sets
+                // a timeout.
                 EchoWorkerThread thread =
                         new EchoWorkerThread(serverSocket.accept(), 
socketTimeout);
                 thread.start();

Reply via email to