This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5ae8cb0 [FLINK-24156][network] Guard against erroneous
SocketTimeoutExceptions
5ae8cb0 is described below
commit 5ae8cb0503449b07f76d0ab621c3e81734496b26
Author: Ryan Scudellari <[email protected]>
AuthorDate: Mon Sep 20 04:48:13 2021 -0400
[FLINK-24156][network] Guard against erroneous SocketTimeoutExceptions
On JDK11 JDK-8237858 can cause SocketTimeoutExceptions to be thrown during
ServerSocket.accept() calls, even when configured with an infinite timeout.
This change works around this issue by catching these erroneous exceptions
and retrying indefinitely.
---
.../main/java/org/apache/flink/util/NetUtils.java | 33 +++++++++++
.../java/org/apache/flink/util/NetUtilsTest.java | 65 ++++++++++++++++++++++
.../test/socket/SocketWindowWordCountITCase.java | 3 +-
.../org/apache/flink/runtime/blob/BlobServer.java | 3 +-
.../runtime/blob/TestingFailingBlobServer.java | 6 +-
.../flink/runtime/net/ConnectionUtilsTest.java | 4 +-
.../apache/flink/runtime/rest/RestClientTest.java | 9 ++-
.../api/operators/collect/CollectSinkFunction.java | 3 +-
.../experimental/SocketStreamIterator.java | 3 +-
.../api/functions/sink/SocketClientSinkTest.java | 11 ++--
.../source/SocketTextStreamFunctionTest.java | 29 +++++-----
.../CollectSinkOperatorCoordinatorTest.java | 3 +-
.../org/apache/flink/networking/EchoServer.java | 2 +
13 files changed, 145 insertions(+), 29 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index a079fd6..89b11df 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -32,6 +32,9 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketOptions;
+import java.net.SocketTimeoutException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.Arrays;
@@ -116,6 +119,36 @@ public class NetUtils {
}
}
+ /**
+ * Calls {@link ServerSocket#accept()} on the provided server socket,
suppressing any thrown
+ * {@link SocketTimeoutException}s. This is a workaround for the
underlying JDK-8237858 bug in
+ * JDK 11 that can cause errant SocketTimeoutExceptions to be thrown at
unexpected times.
+ *
+ * <p>This method expects the provided ServerSocket has no timeout set
(SO_TIMEOUT of 0),
+ * indicating an infinite timeout. It will suppress all
SocketTimeoutExceptions, even if a
+ * ServerSocket with a non-zero timeout is passed in.
+ *
+ * @param serverSocket a ServerSocket with {@link SocketOptions#SO_TIMEOUT
SO_TIMEOUT} set to 0;
+ * if SO_TIMEOUT is greater than 0, then this method will suppress
SocketTimeoutException;
+ * must not be null; SO_TIMEOUT option must be set to 0
+ * @return the new Socket
+ * @exception IOException see {@link ServerSocket#accept()}
+ * @see <a
href="https://bugs.openjdk.java.net/browse/JDK-8237858">JDK-8237858</a>
+ */
+ public static Socket acceptWithoutTimeout(ServerSocket serverSocket)
throws IOException {
+ Preconditions.checkArgument(
+ serverSocket.getSoTimeout() == 0, "serverSocket SO_TIMEOUT
option must be 0");
+ while (true) {
+ try {
+ return serverSocket.accept();
+ } catch (SocketTimeoutException exception) {
+ // This should be impossible given that the socket timeout is
set to zero
+ // which indicates an infinite timeout. This is due to the
underlying JDK-8237858
+ // bug. We retry the accept call indefinitely to replicate the
expected behavior.
+ }
+ }
+ }
+
// ------------------------------------------------------------------------
// Lookup of to free ports
// ------------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
index f54d715..5410694 100644
--- a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
@@ -21,8 +21,12 @@ package org.apache.flink.util;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.HashSet;
@@ -50,6 +54,67 @@ public class NetUtilsTest extends TestLogger {
}
@Test
+ public void testAcceptWithoutTimeoutSuppressesTimeoutException() throws
IOException {
+ // Validates that acceptWithoutTimeout suppresses all
SocketTimeoutExceptions
+ Socket expected = new Socket();
+ ServerSocket serverSocket =
+ new ServerSocket() {
+ private int count = 0;
+
+ @Override
+ public Socket accept() throws IOException {
+ if (count < 2) {
+ count++;
+ throw new SocketTimeoutException();
+ }
+
+ return expected;
+ }
+ };
+
+ assertEquals(expected, NetUtils.acceptWithoutTimeout(serverSocket));
+ }
+
+ @Test
+ public void testAcceptWithoutTimeoutDefaultTimeout() throws IOException {
+ // Default timeout (should be zero)
+ final Socket expected = new Socket();
+ try (final ServerSocket serverSocket =
+ new ServerSocket(0) {
+ @Override
+ public Socket accept() {
+ return expected;
+ }
+ }) {
+ assertEquals(expected,
NetUtils.acceptWithoutTimeout(serverSocket));
+ }
+ }
+
+ @Test
+ public void testAcceptWithoutTimeoutZeroTimeout() throws IOException {
+ // Explicitly sets a timeout of zero
+ final Socket expected = new Socket();
+ try (final ServerSocket serverSocket =
+ new ServerSocket(0) {
+ @Override
+ public Socket accept() {
+ return expected;
+ }
+ }) {
+ serverSocket.setSoTimeout(0);
+ assertEquals(expected,
NetUtils.acceptWithoutTimeout(serverSocket));
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testAcceptWithoutTimeoutRejectsSocketWithSoTimeout() throws
IOException {
+ try (final ServerSocket serverSocket = new ServerSocket(0)) {
+ serverSocket.setSoTimeout(5);
+ NetUtils.acceptWithoutTimeout(serverSocket);
+ }
+ }
+
+ @Test
public void testIPv4toURL() {
try {
final String addressString = "192.168.0.1";
diff --git
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
index 8bc7ba6..7f27378 100644
---
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
+++
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
@@ -21,6 +21,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.examples.socket.SocketWindowWordCount;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.NetUtils;
import org.junit.Test;
@@ -138,7 +139,7 @@ public class SocketWindowWordCountITCase extends
AbstractTestBase {
@Override
public void run() {
try {
- try (Socket socket = serverSocket.accept();
+ try (Socket socket =
NetUtils.acceptWithoutTimeout(serverSocket);
PrintWriter writer = new
PrintWriter(socket.getOutputStream(), true)) {
writer.println(WordCountData.TEXT);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 71d31a3..93aeab4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -264,7 +264,8 @@ public class BlobServer extends Thread
public void run() {
try {
while (!this.shutdownRequested.get()) {
- BlobServerConnection conn = new
BlobServerConnection(serverSocket.accept(), this);
+ BlobServerConnection conn =
+ new
BlobServerConnection(NetUtils.acceptWithoutTimeout(serverSocket), this);
try {
synchronized (activeConnections) {
while (activeConnections.size() >= maxConnections) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
index 18c2725..dfeb125 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.blob;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.NetUtils;
import java.io.IOException;
import java.io.InputStream;
@@ -53,7 +54,8 @@ public class TestingFailingBlobServer extends BlobServer {
// we do properly the first operation (PUT)
try {
for (int num = 0; num < numAccept && !isShutdown(); num++) {
- new BlobServerConnection(getServerSocket().accept(),
this).start();
+ new
BlobServerConnection(NetUtils.acceptWithoutTimeout(getServerSocket()), this)
+ .start();
}
} catch (Throwable t) {
t.printStackTrace();
@@ -63,7 +65,7 @@ public class TestingFailingBlobServer extends BlobServer {
for (int num = 0; num < numFailures && !isShutdown(); num++) {
Socket socket = null;
try {
- socket = getServerSocket().accept();
+ socket = NetUtils.acceptWithoutTimeout(getServerSocket());
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
index e38b8e4..22655ec 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.net;
+import org.apache.flink.util.NetUtils;
+
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
@@ -78,7 +80,7 @@ public class ConnectionUtilsTest {
@Override
public void run() {
try {
- socket.accept();
+ NetUtils.acceptWithoutTimeout(socket);
} catch (IOException e) {
// ignore
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index 3b74857..06aad7e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.CheckedSupplier;
@@ -110,7 +111,9 @@ public class RestClientTest extends TestLogger {
// start server
final CompletableFuture<Socket> socketCompletableFuture =
-
CompletableFuture.supplyAsync(CheckedSupplier.unchecked(serverSocket::accept));
+ CompletableFuture.supplyAsync(
+ CheckedSupplier.unchecked(
+ () ->
NetUtils.acceptWithoutTimeout(serverSocket)));
final CompletableFuture<EmptyResponseBody> responseFuture =
restClient.sendRequest(
@@ -162,7 +165,9 @@ public class RestClientTest extends TestLogger {
// start server
final CompletableFuture<Socket> socketCompletableFuture =
-
CompletableFuture.supplyAsync(CheckedSupplier.unchecked(serverSocket::accept));
+ CompletableFuture.supplyAsync(
+ CheckedSupplier.unchecked(
+ () ->
NetUtils.acceptWithoutTimeout(serverSocket)));
final CompletableFuture<EmptyResponseBody> responseFuture =
restClient.sendRequest(
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
index fe810d5..042bbb8 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
@@ -37,6 +37,7 @@ import
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -382,7 +383,7 @@ public class CollectSinkFunction<IN> extends
RichSinkFunction<IN>
try {
if (connection == null) {
// waiting for coordinator to connect
- connection = serverSocket.accept();
+ connection =
NetUtils.acceptWithoutTimeout(serverSocket);
inStream = new
DataInputViewStreamWrapper(this.connection.getInputStream());
outStream =
new
DataOutputViewStreamWrapper(this.connection.getOutputStream());
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
index 5df5f69..69a72c3 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.experimental;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.NetUtils;
import java.io.EOFException;
import java.io.IOException;
@@ -171,7 +172,7 @@ public class SocketStreamIterator<T> implements Iterator<T>
{
private T readNextFromStream() throws Exception {
try {
if (inStream == null) {
- connectedSocket = socket.accept();
+ connectedSocket = NetUtils.acceptWithoutTimeout(socket);
inStream = new
DataInputViewStreamWrapper(connectedSocket.getInputStream());
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
index a8287a5..28e3e0d 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.sink;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.apache.commons.io.IOUtils;
@@ -88,7 +89,7 @@ public class SocketClientSinkTest extends TestLogger {
sinkRunner.start();
- Socket sk = server.accept();
+ Socket sk = NetUtils.acceptWithoutTimeout(server);
BufferedReader rdr = new BufferedReader(new
InputStreamReader(sk.getInputStream()));
String value = rdr.readLine();
@@ -132,7 +133,7 @@ public class SocketClientSinkTest extends TestLogger {
sinkRunner.start();
- Socket sk = server.accept();
+ Socket sk = NetUtils.acceptWithoutTimeout(server);
BufferedReader rdr = new BufferedReader(new
InputStreamReader(sk.getInputStream()));
String value = rdr.readLine();
@@ -163,7 +164,7 @@ public class SocketClientSinkTest extends TestLogger {
@Override
public void run() {
try {
- Socket sk = server.accept();
+ Socket sk =
NetUtils.acceptWithoutTimeout(server);
sk.close();
} catch (Throwable t) {
error.set(t);
@@ -221,7 +222,7 @@ public class SocketClientSinkTest extends TestLogger {
new Callable<Void>() {
@Override
public Void call() throws Exception {
- Socket socket = serverSocket[0].accept();
+ Socket socket =
NetUtils.acceptWithoutTimeout(serverSocket[0]);
BufferedReader reader =
new BufferedReader(
@@ -296,7 +297,7 @@ public class SocketClientSinkTest extends TestLogger {
throw new AssumptionViolatedException(
"Could not bind server to previous port.", be);
}
- Socket socket = serverSocket[0].accept();
+ Socket socket = NetUtils.acceptWithoutTimeout(serverSocket[0]);
BufferedReader reader =
new BufferedReader(new
InputStreamReader(socket.getInputStream()));
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
index 6b682c1..1354384 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.functions.source;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.NetUtils;
import org.apache.commons.io.IOUtils;
import org.junit.Test;
@@ -50,7 +51,7 @@ public class SocketTextStreamFunctionTest {
SocketSourceThread runner = new SocketSourceThread(source,
"test1", "check");
runner.start();
- channel = server.accept();
+ channel = NetUtils.acceptWithoutTimeout(server);
OutputStreamWriter writer = new
OutputStreamWriter(channel.getOutputStream());
writer.write("test1\n");
@@ -84,7 +85,7 @@ public class SocketTextStreamFunctionTest {
SocketSourceThread runner = new SocketSourceThread(source);
runner.start();
- channel = server.accept();
+ channel = NetUtils.acceptWithoutTimeout(server);
channel.close();
try {
@@ -113,22 +114,22 @@ public class SocketTextStreamFunctionTest {
runner.start();
// first connection: nothing
- channel = server.accept();
+ channel = NetUtils.acceptWithoutTimeout(server);
channel.close();
// second connection: first string
- channel = server.accept();
+ channel = NetUtils.acceptWithoutTimeout(server);
OutputStreamWriter writer = new
OutputStreamWriter(channel.getOutputStream());
writer.write("test1\n");
writer.close();
channel.close();
// third connection: nothing
- channel = server.accept();
+ channel = NetUtils.acceptWithoutTimeout(server);
channel.close();
// forth connection: second string
- channel = server.accept();
+ channel = NetUtils.acceptWithoutTimeout(server);
writer = new OutputStreamWriter(channel.getOutputStream());
writer.write("check\n");
writer.flush();
@@ -157,22 +158,22 @@ public class SocketTextStreamFunctionTest {
runner.start();
// first connection: nothing
- channel = server.accept();
+ channel = NetUtils.acceptWithoutTimeout(server);
channel.close();
// second connection: first string
- channel = server.accept();
+ channel = NetUtils.acceptWithoutTimeout(server);
OutputStreamWriter writer = new
OutputStreamWriter(channel.getOutputStream());
writer.write("test1\n");
writer.close();
channel.close();
// third connection: nothing
- channel = server.accept();
+ channel = NetUtils.acceptWithoutTimeout(server);
channel.close();
// forth connection: second string
- channel = server.accept();
+ channel = NetUtils.acceptWithoutTimeout(server);
writer = new OutputStreamWriter(channel.getOutputStream());
writer.write("check\n");
writer.flush();
@@ -201,22 +202,22 @@ public class SocketTextStreamFunctionTest {
runner.start();
// first connection: nothing
- channel = server.accept();
+ channel = NetUtils.acceptWithoutTimeout(server);
channel.close();
// second connection: first string
- channel = server.accept();
+ channel = NetUtils.acceptWithoutTimeout(server);
OutputStreamWriter writer = new
OutputStreamWriter(channel.getOutputStream());
writer.write("te");
writer.close();
channel.close();
// third connection: nothing
- channel = server.accept();
+ channel = NetUtils.acceptWithoutTimeout(server);
channel.close();
// forth connection: second string
- channel = server.accept();
+ channel = NetUtils.acceptWithoutTimeout(server);
writer = new OutputStreamWriter(channel.getOutputStream());
writer.write("st1\n");
writer.write("check1\n");
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
index 8f0cc5d..2a16512 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
@@ -26,6 +26,7 @@ import
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.streaming.api.operators.collect.utils.CollectTestUtils;
import org.apache.flink.types.Row;
+import org.apache.flink.util.NetUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -165,7 +166,7 @@ public class CollectSinkOperatorCoordinatorTest {
try {
while (running) {
if (socket == null) {
- socket = server.accept();
+ socket = NetUtils.acceptWithoutTimeout(server);
inStream = new
DataInputViewStreamWrapper(socket.getInputStream());
outStream = new
DataOutputViewStreamWrapper(socket.getOutputStream());
}
diff --git
a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
index 00a85f8..8835a66 100644
---
a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
+++
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
@@ -51,6 +51,8 @@ public class EchoServer extends Thread implements
AutoCloseable {
public void run() {
while (!close) {
try {
+ // We are NOT using NetUtils.acceptWithoutTimeout here as this
ServerSocket sets
+ // a timeout.
EchoWorkerThread thread =
new EchoWorkerThread(serverSocket.accept(),
socketTimeout);
thread.start();