This is an automated email from the ASF dual-hosted git repository.
iwasakims pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6ed0158 HADOOP-18024. SocketChannel is not closed when IOException
happens in Server$Listener.doAccept (#3719)
6ed0158 is described below
commit 6ed01585eb1929497efbafe2f19bda4f1a56575c
Author: Haoze Wu <[email protected]>
AuthorDate: Wed Dec 8 04:48:43 2021 -0500
HADOOP-18024. SocketChannel is not closed when IOException happens in
Server$Listener.doAccept (#3719)
---
.../main/java/org/apache/hadoop/ipc/Server.java | 25 ++++++++++--
.../test/java/org/apache/hadoop/ipc/TestIPC.java | 47 ++++++++++++++++++++++
2 files changed, 68 insertions(+), 4 deletions(-)
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 7a391c4..7852956 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -1324,6 +1324,14 @@ public abstract class Server {
}
}
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ protected void configureSocketChannel(SocketChannel channel) throws
IOException {
+ channel.configureBlocking(false);
+ channel.socket().setTcpNoDelay(tcpNoDelay);
+ channel.socket().setKeepAlive(true);
+ }
+
/** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread {
@@ -1530,15 +1538,24 @@ public abstract class Server {
InetSocketAddress getAddress() {
return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
}
-
+
void doAccept(SelectionKey key) throws InterruptedException, IOException,
OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
- channel.configureBlocking(false);
- channel.socket().setTcpNoDelay(tcpNoDelay);
- channel.socket().setKeepAlive(true);
+ try {
+ configureSocketChannel(channel);
+ } catch (IOException e) {
+ LOG.warn("Error in an accepted SocketChannel", e);
+ try {
+ channel.socket().close();
+ channel.close();
+ } catch (IOException ex) {
+ LOG.warn("Error in closing SocketChannel", ex);
+ }
+ continue;
+ }
Reader reader = getReader();
Connection c = connectionManager.register(channel,
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index 95ff302..90415b4 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -34,6 +34,7 @@ import static org.mockito.Mockito.verify;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -48,6 +49,7 @@ import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -616,6 +618,51 @@ public class TestIPC {
WRITABLE_FAULTS_SLEEP = 0;
}
}
+
+ /**
+ * Test for HADOOP-18024.
+ */
+ @Test(timeout=60000)
+ public void testIOEOnListenerAccept() throws Exception {
+ // start server
+ Server server = new TestServer(1, false,
+ LongWritable.class, LongWritable.class) {
+ @Override
+ protected void configureSocketChannel(SocketChannel channel) throws
IOException {
+ maybeThrowIOE();
+ super.configureSocketChannel(channel);
+ }
+ };
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
+
+ // start client
+ WRITABLE_FAULTS_ENABLED = true;
+ Client client = new Client(LongWritable.class, conf);
+ try {
+ LongWritable param = LongWritable.class.newInstance();
+
+ try {
+ call(client, param, addr, 0, conf);
+ fail("Expected an exception to have been thrown");
+ } catch (EOFException e) {
+ LOG.info("Got expected exception", e);
+ } catch (Throwable t) {
+ LOG.warn("Got unexpected error", t);
+ fail("Expected an EOFException to have been thrown");
+ }
+
+ // Doing a second call with faults disabled should return fine --
+ // ie the internal state of the client or server should not be broken
+ // by the failed call
+ WRITABLE_FAULTS_ENABLED = false;
+ call(client, param, addr, 0, conf);
+
+ } finally {
+ client.stop();
+ server.stop();
+ }
+ }
private static void assertExceptionContains(
Throwable t, String substring) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]