Repository: hadoop Updated Branches: refs/heads/branch-2 cabf97ae4 -> 3a01984bd
HADOOP-9137. Support connection limiting in IPC server. Contributed by Kihwal Lee. (cherry picked from commit 8dc59cb9e0f8d300991a437c1b42f1e4e495cfe4) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3a01984b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3a01984b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3a01984b Branch: refs/heads/branch-2 Commit: 3a01984bdad8fd80d9af6e926780913b10d1b772 Parents: cabf97a Author: Kihwal Lee <[email protected]> Authored: Fri Jan 30 17:22:48 2015 -0600 Committer: Kihwal Lee <[email protected]> Committed: Fri Jan 30 17:22:48 2015 -0600 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 2 + .../fs/CommonConfigurationKeysPublic.java | 5 ++ .../main/java/org/apache/hadoop/ipc/Server.java | 19 ++++++++ .../src/main/resources/core-default.xml | 11 +++++ .../java/org/apache/hadoop/ipc/TestIPC.java | 51 ++++++++++++++++++++ 5 files changed, 88 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a01984b/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 217f06e..436509f 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -158,6 +158,8 @@ Release 2.7.0 - UNRELEASED HADOOP-11441. Hadoop-azure: Change few methods scope to public. (Shashank Khandelwal via cnauroth) + HADOOP-9137. Support connection limiting in IPC server (kihwal) + OPTIMIZATIONS HADOOP-11323. WritableComparator#compare keeps reference to byte array. http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a01984b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index d287bdd..459b984 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -227,6 +227,11 @@ public class CommonConfigurationKeysPublic { "ipc.server.tcpnodelay"; /** Default value for IPC_SERVER_TCPNODELAY_KEY */ public static final boolean IPC_SERVER_TCPNODELAY_DEFAULT = true; + /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */ + public static final String IPC_SERVER_MAX_CONNECTIONS_KEY = + "ipc.server.max.connections"; + /** Default value for IPC_SERVER_MAX_CONNECTIONS_KEY */ + public static final int IPC_SERVER_MAX_CONNECTIONS_DEFAULT = 0; /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */ public static final String HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY = http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a01984b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 60e86c2..3c710f3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -746,6 +746,13 @@ public abstract class Server { Reader reader = getReader(); Connection c = connectionManager.register(channel); + // If the connectionManager can't take it, close the connection. + if (c == null) { + if (channel.isOpen()) { + IOUtils.cleanup(null, channel); + } + continue; + } key.attach(c); // so closeCurrentConnection can get the object reader.addConnection(c); } @@ -2645,6 +2652,7 @@ public abstract class Server { final private int idleScanInterval; final private int maxIdleTime; final private int maxIdleToClose; + final private int maxConnections; ConnectionManager() { this.idleScanTimer = new Timer( @@ -2661,6 +2669,9 @@ public abstract class Server { this.maxIdleToClose = conf.getInt( CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY, CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT); + this.maxConnections = conf.getInt( + CommonConfigurationKeysPublic.IPC_SERVER_MAX_CONNECTIONS_KEY, + CommonConfigurationKeysPublic.IPC_SERVER_MAX_CONNECTIONS_DEFAULT); // create a set with concurrency -and- a thread-safe iterator, add 2 // for listener and idle closer threads this.connections = Collections.newSetFromMap( @@ -2688,11 +2699,19 @@ public abstract class Server { return count.get(); } + boolean isFull() { + // The check is disabled when maxConnections <= 0. + return ((maxConnections > 0) && (size() >= maxConnections)); + } + Connection[] toArray() { return connections.toArray(new Connection[0]); } Connection register(SocketChannel channel) { + if (isFull()) { + return null; + } Connection connection = new Connection(channel, Time.now()); add(connection); if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a01984b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 4a9c3da..0a0bdfe 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1670,4 +1670,15 @@ for ldap providers in the same way as above does. </description> </property> + <property> + <name>ipc.server.max.connections</name> + <value>0</value> + <description>The maximum number of concurrent connections a server is allowed + to accept. If this limit is exceeded, incoming connections will first fill + the listen queue and then may go to an OS-specific listen overflow queue. + The client may fail or timeout, but the server can avoid running out of file + descriptors using this feature. 0 means no limit. + </description> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a01984b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 02516a1..04a7412 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -1184,6 +1184,57 @@ public class TestIPC { } } + @Test + public void testMaxConnections() throws Exception { + conf.setInt("ipc.server.max.connections", 5); + Server server = null; + Thread connectors[] = new Thread[10]; + + try { + server = new TestServer(3, false); + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + assertEquals(0, server.getNumOpenConnections()); + + for (int i = 0; i < 10; i++) { + connectors[i] = new Thread() { + @Override + public void run() { + Socket sock = null; + try { + sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); + NetUtils.connect(sock, addr, 3000); + try { + Thread.sleep(4000); + } catch (InterruptedException ie) { } + } catch (IOException ioe) { + } finally { + if (sock != null) { + try { + sock.close(); + } catch (IOException ioe) { } + } + } + } + }; + connectors[i].start(); + } + + Thread.sleep(1000); + // server should only accept up to 5 connections + assertEquals(5, server.getNumOpenConnections()); + + for (int i = 0; i < 10; i++) { + connectors[i].join(); + } + } finally { + if (server != null) { + server.stop(); + } + conf.setInt("ipc.server.max.connections", 0); + } + } + private void assertRetriesOnSocketTimeouts(Configuration conf, int maxTimeoutRetries) throws IOException { SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
