Author: cutting Date: Wed Jun 21 11:15:11 2006 New Revision: 416055 URL: http://svn.apache.org/viewvc?rev=416055&view=rev Log: HADOOP-210. Change RPC server to use a selector instead of a thread per connection. Contributed by Devaraj Das.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/conf/hadoop-default.xml lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=416055&r1=416054&r2=416055&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Jun 21 11:15:11 2006 @@ -20,6 +20,12 @@ web ui. Also attempt to log a thread dump of child processes before they're killed. (omalley via cutting) + 6. HADOOP-210. Change RPC server to use a selector instead of a + thread per connection. This should make it easier to scale to + larger clusters. Note that this incompatibly changes the RPC + protocol: clients and servers must both be upgraded to the new + version to ensure correct operation. (Devaraj Das via cutting) + Release 0.3.2 - 2006-06-09 Modified: lucene/hadoop/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=416055&r1=416054&r2=416055&view=diff ============================================================================== --- lucene/hadoop/trunk/conf/hadoop-default.xml (original) +++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Jun 21 11:15:11 2006 @@ -332,4 +332,28 @@ <description>Defines the timeout for IPC calls in milliseconds.</description> </property> +<property> + <name>ipc.client.idlethreshold</name> + <value>4000</value> + <description>Defines the threshold numner of connections after which + connections will be inspected for idleness. + </description> +</property> + +<property> + <name>ipc.client.maxidletime</name> + <value>120000</value> + <description>Defines the maximum idle time for a connected client after + which it may be disconnected. + </description> +</property> + +<property> + <name>ipc.client.kill.max</name> + <value>10</value> + <description>Defines the maximum number of clients to disconnect in one go. + </description> +</property> + + </configuration> Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=416055&r1=416054&r2=416055&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Jun 21 11:15:11 2006 @@ -38,6 +38,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.DataOutputBuffer; /** A client for an IPC service. IPC calls take a single [EMAIL PROTECTED] Writable} as a * parameter, and return a [EMAIL PROTECTED] Writable} as their value. A service runs on @@ -196,8 +197,15 @@ LOG.debug(getName() + " sending #" + call.id); try { writingCall = call; - out.writeInt(call.id); - call.param.write(out); + DataOutputBuffer d = new DataOutputBuffer(); //for serializing the + //data to be written + d.writeInt(call.id); + call.param.write(d); + byte[] data = d.getData(); + int dataLength = d.getLength(); + + out.writeInt(dataLength); //first put the data length + out.write(data, 0, dataLength);//write the data out.flush(); } finally { writingCall = null; @@ -208,7 +216,7 @@ if (error) close(); // close on error } - } + } /** Close the connection and remove it from the pool. */ public void close() { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=416055&r1=416054&r2=416055&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed Jun 21 11:15:11 2006 @@ -20,17 +20,26 @@ import java.io.EOFException; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.StringWriter; import java.io.PrintWriter; +import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.BufferUnderflowException; + +import java.net.InetSocketAddress; import java.net.Socket; -import java.net.ServerSocket; -import java.net.SocketException; -import java.net.SocketTimeoutException; +import java.util.Collections; import java.util.LinkedList; +import java.util.List; +import java.util.Iterator; +import java.util.Random; import org.apache.commons.logging.*; @@ -38,7 +47,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.io.UTF8; + +import org.mortbay.http.nio.SocketChannelOutputStream; /** An abstract IPC service. IPC calls take a single [EMAIL PROTECTED] Writable} as a * parameter, and return a [EMAIL PROTECTED] Writable} as their value. A service runs on @@ -65,6 +75,16 @@ private int handlerCount; // number of handler threads private int maxQueuedCalls; // max number of queued calls private Class paramClass; // class of call parameters + private int maxIdleTime; // the maximum idle time after + // which a client may be disconnected + private int thresholdIdleConnections; // the number of idle connections + // after which we will start + // cleaning up idle + // connections + int maxConnectionsToNuke; // the max number of + // connections to nuke + //during a cleanup + private Configuration conf; private int timeout; @@ -73,6 +93,12 @@ private LinkedList callQueue = new LinkedList(); // queued calls private Object callDequeued = new Object(); // used by wait/notify + private List connectionList = + Collections.synchronizedList(new LinkedList()); //maintain a list + //of client connectionss + private Listener listener; + private int numConnections = 0; + /** A call queued for handling. */ private static class Call { private int id; // the client's call id @@ -86,113 +112,323 @@ } } - /** Listens on the socket, starting new connection threads. */ + /** Listens on the socket. Creates jobs for the handler threads*/ private class Listener extends Thread { - private ServerSocket socket; - + + private ServerSocketChannel acceptChannel = null; //the accept channel + private Selector selector = null; //the selector that we use for the server + private InetSocketAddress address; //the address we bind at + private Random rand = new Random(); + private long lastCleanupRunTime = 0; //the last time when a cleanup connec- + //-tion (for idle connections) ran + private long cleanupInterval = 10000; //the minimum interval between + //two cleanup runs + public Listener() throws IOException { - this.socket = new ServerSocket(port); - socket.setSoTimeout(timeout); - this.setDaemon(true); + address = new InetSocketAddress(port); + // Create a new server socket and set to non blocking mode + acceptChannel = ServerSocketChannel.open(); + acceptChannel.configureBlocking(false); + + // Bind the server socket to the local host and port + acceptChannel.socket().bind(address); + // create a selector; + selector= Selector.open(); + + // Register accepts on the server socket with the selector. + acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("Server listener on port " + port); + this.setDaemon(true); + } + /** cleanup connections from connectionList. Choose a random range + * to scan and also have a limit on the number of the connections + * that will be cleanedup per run. The criteria for cleanup is the time + * for which the connection was idle. If 'force' is true then all + * connections will be looked at for the cleanup. + */ + private void cleanupConnections(boolean force) { + if (force || numConnections > thresholdIdleConnections) { + long currentTime = System.currentTimeMillis(); + if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) { + return; + } + int start = 0; + int end = numConnections - 1; + if (!force) { + start = rand.nextInt() % numConnections; + end = rand.nextInt() % numConnections; + int temp; + if (end < start) { + temp = start; + start = end; + end = temp; + } + } + int i = start; + int numNuked = 0; + while (i <= end) { + Connection c; + synchronized (connectionList) { + try { + c = (Connection)connectionList.get(i); + } catch (Exception e) {return;} + } + if (c.timedOut(currentTime)) { + synchronized (connectionList) { + if (connectionList.remove(c)) + numConnections--; + } + try { + LOG.info(getName() + ": disconnecting client " + c.getHostAddress()); + c.close(); + } catch (Exception e) {} + numNuked++; + end--; + c = null; + if (!force && numNuked == maxConnectionsToNuke) break; + } + else i++; + } + lastCleanupRunTime = System.currentTimeMillis(); + } } public void run() { LOG.info(getName() + ": starting"); + SERVER.set(Server.this); while (running) { - Socket acceptedSock = null; + SelectionKey key = null; try { - acceptedSock = socket.accept(); - new Connection(acceptedSock).start(); // start a new connection - } catch (SocketTimeoutException e) { // ignore timeouts + selector.select(); + Iterator iter = selector.selectedKeys().iterator(); + + while (iter.hasNext()) { + key = (SelectionKey)iter.next(); + if (key.isAcceptable()) + doAccept(key); + else if (key.isReadable()) + doRead(key); + iter.remove(); + key = null; + } } catch (OutOfMemoryError e) { // we can run out of memory if we have too many threads // log the event and sleep for a minute and give // some thread(s) a chance to finish - LOG.warn(getName() + " out of memory, sleeping...", e); + closeCurrentConnection(key, e); + cleanupConnections(true); + try { Thread.sleep(60000); } catch (Exception ie) {} + } catch (Exception e) { + closeCurrentConnection(key, e); + } + cleanupConnections(false); + } + LOG.info("Stopping " + this.getName()); + + try { + if (acceptChannel != null) + acceptChannel.close(); + if (selector != null) + selector.close(); + } catch (IOException e) { } + + selector= null; + acceptChannel= null; + connectionList = null; + } + + private void closeCurrentConnection(SelectionKey key, Throwable e) { + if (running) { + LOG.warn("selector: " + e); + e.printStackTrace(); + } + if (key != null) { + Connection c = (Connection)key.attachment(); + if (c != null) { + synchronized (connectionList) { + if (connectionList.remove(c)) + numConnections--; + } try { - acceptedSock.close(); - Thread.sleep(60000); - } catch (InterruptedException ie) { // ignore interrupts - } catch (IOException ioe) { // ignore IOexceptions - } + LOG.info(getName() + ": disconnecting client " + c.getHostAddress()); + c.close(); + } catch (Exception ex) {} + c = null; } - catch (Exception e) { // log all other exceptions - LOG.info(getName() + " caught: " + e, e); - } } + } + + void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { + Connection c = null; + ServerSocketChannel server = (ServerSocketChannel) key.channel(); + SocketChannel channel = server.accept(); + channel.configureBlocking(false); + SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ); + c = new Connection(readKey, channel, System.currentTimeMillis()); + readKey.attach(c); + synchronized (connectionList) { + connectionList.add(numConnections, c); + numConnections++; + } + LOG.info("Server connection on port " + port + " from " + + c.getHostAddress() + + ": starting. Number of active connections: " + numConnections); + } + + void doRead(SelectionKey key) { + int count = 0; + if (!key.isValid() || !key.isReadable()) + return; + Connection c = (Connection)key.attachment(); + if (c == null) { + return; + } + c.setLastContact(System.currentTimeMillis()); + try { - socket.close(); - } catch (IOException e) { - LOG.info(getName() + ": e=" + e); + count = c.readAndProcess(); + } catch (Exception e) { + LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count); + e.printStackTrace(); + count = -1; //so that the (count < 0) block is executed } - LOG.info(getName() + ": exiting"); + if (count < 0) { + synchronized (connectionList) { + if (connectionList.remove(c)) + numConnections--; + } + try { + LOG.info(getName() + ": disconnecting client " + + c.getHostAddress() + ". Number of active connections: "+ + numConnections); + c.close(); + } catch (Exception e) {} + c = null; + } + else { + c.setLastContact(System.currentTimeMillis()); + } + } + + void doStop() + { + selector.wakeup(); + Thread.yield(); } } /** Reads calls from a connection and queues them for handling. */ - private class Connection extends Thread { - private Socket socket; - private DataInputStream in; + private class Connection { + private SocketChannel channel; + private SelectionKey key; + private ByteBuffer data; + private ByteBuffer dataLengthBuffer; private DataOutputStream out; + private SocketChannelOutputStream channelOut; + private long lastContact; + private int dataLength; + private Socket socket; - public Connection(Socket socket) throws IOException { - this.socket = socket; - socket.setSoTimeout(timeout); - this.in = new DataInputStream - (new BufferedInputStream(socket.getInputStream())); + public Connection(SelectionKey key, SocketChannel channel, + long lastContact) { + this.key = key; + this.channel = channel; + this.lastContact = lastContact; + this.data = null; + this.dataLengthBuffer = null; + this.socket = channel.socket(); this.out = new DataOutputStream - (new BufferedOutputStream(socket.getOutputStream())); - this.setDaemon(true); - this.setName("Server connection on port " + port + " from " - + socket.getInetAddress().getHostAddress()); + (new BufferedOutputStream( + this.channelOut = new SocketChannelOutputStream(channel, 4096))); + } + + public String getHostAddress() { + return socket.getInetAddress().getHostAddress(); } - public void run() { - LOG.info(getName() + ": starting"); - SERVER.set(Server.this); - try { - while (running) { - int id; - try { - id = in.readInt(); // try to read an id - } catch (SocketTimeoutException e) { - continue; - } - - if (LOG.isDebugEnabled()) - LOG.debug(getName() + " got #" + id); - - Writable param = makeParam(); // read param - param.readFields(in); + public void setLastContact(long lastContact) { + this.lastContact = lastContact; + } + + public long getLastContact() { + return lastContact; + } + + private boolean timedOut() { + if(System.currentTimeMillis() - lastContact > maxIdleTime) + return true; + return false; + } + + private boolean timedOut(long currentTime) { + if(currentTime - lastContact > maxIdleTime) + return true; + return false; + } + + public int readAndProcess() throws IOException, InterruptedException { + int count = -1; + if (dataLengthBuffer == null) + dataLengthBuffer = ByteBuffer.allocateDirect(4); + if (dataLengthBuffer.remaining() > 0) { + count = channel.read(dataLengthBuffer); + if (count < 0) return count; + if (dataLengthBuffer.remaining() == 0) { + dataLengthBuffer.flip(); + dataLength = dataLengthBuffer.getInt(); + data = ByteBuffer.allocateDirect(dataLength); + } + //return count; + } + count = channel.read(data); + if (data.remaining() == 0) { + data.flip(); + processData(); + data = dataLengthBuffer = null; + } + return count; + } + + private void processData() throws IOException, InterruptedException { + byte[] bytes = new byte[dataLength]; + data.get(bytes); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes)); + int id = dis.readInt(); // try to read an id - Call call = new Call(id, param, this); + if (LOG.isDebugEnabled()) + LOG.debug(" got #" + id); + + Writable param = makeParam(); // read param + param.readFields(dis); - synchronized (callQueue) { - callQueue.addLast(call); // queue the call - callQueue.notify(); // wake up a waiting handler - } + Call call = new Call(id, param, this); + synchronized (callQueue) { + callQueue.addLast(call); // queue the call + callQueue.notify(); // wake up a waiting handler + } - while (running && callQueue.size() >= maxQueuedCalls) { - synchronized (callDequeued) { // queue is full - callDequeued.wait(timeout); // wait for a dequeue - } - } + while (running && callQueue.size() >= maxQueuedCalls) { + synchronized (callDequeued) { // queue is full + callDequeued.wait(timeout); // wait for a dequeue } - } catch (EOFException eof) { - // This is what happens on linux when the other side shuts down - } catch (SocketException eof) { - // This is what happens on Win32 when the other side shuts down - } catch (Exception e) { - LOG.info(getName() + " caught: " + e, e); - } finally { - try { - socket.close(); - } catch (IOException e) {} - LOG.info(getName() + ": exiting"); } } + private void close() throws IOException { + data = null; + dataLengthBuffer = null; + if (!channel.isOpen()) + return; + try {socket.shutdownOutput();} catch(Exception e) {} + try {out.close();} catch(Exception e) {} + try {channelOut.destroy();} catch(Exception e) {} + if (channel.isOpen()) { + try {channel.close();} catch(Exception e) {} + } + try {socket.close();} catch(Exception e) {} + try {key.cancel();} catch(Exception e) {} + key = null; + } } /** Handles queued calls . */ @@ -237,15 +473,24 @@ DataOutputStream out = call.connection.out; synchronized (out) { - out.writeInt(call.id); // write call id - out.writeBoolean(error!=null); // write error flag - if (error == null) { - value.write(out); - } else { - WritableUtils.writeString(out, errorClass); - WritableUtils.writeString(out, error); + try { + out.writeInt(call.id); // write call id + out.writeBoolean(error!=null); // write error flag + if (error == null) { + value.write(out); + } else { + WritableUtils.writeString(out, errorClass); + WritableUtils.writeString(out, error); + } + out.flush(); + } catch (Exception e) { + e.printStackTrace(); + synchronized (connectionList) { + if (connectionList.remove(call.connection)) + numConnections--; + } + call.connection.close(); } - out.flush(); } } catch (Exception e) { @@ -275,7 +520,10 @@ this.paramClass = paramClass; this.handlerCount = handlerCount; this.maxQueuedCalls = handlerCount; - this.timeout = conf.getInt("ipc.client.timeout",10000); + this.timeout = conf.getInt("ipc.client.timeout",10000); + this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000); + this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); + this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); } /** Sets the timeout used for network i/o. */ @@ -283,7 +531,7 @@ /** Starts the service. Must be called before any calls will be handled. */ public synchronized void start() throws IOException { - Listener listener = new Listener(); + listener = new Listener(); listener.start(); for (int i = 0; i < handlerCount; i++) { @@ -298,6 +546,7 @@ public synchronized void stop() { LOG.info("Stopping server on " + port); running = false; + listener.doStop(); try { Thread.sleep(timeout); // inexactly wait for pending requests to finish } catch (InterruptedException e) {}