Author: cutting Date: Tue Jun 13 12:18:43 2006 New Revision: 413958 URL: http://svn.apache.org/viewvc?rev=413958&view=rev Log: HADOOP-210. Change RPC server to use a selector instead of a thread per connection. This should make it easier to scale to larger clusters. Contributed by Devaraj Das.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=413958&r1=413957&r2=413958&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Jun 13 12:18:43 2006 @@ -1,6 +1,13 @@ Hadoop Change Log +Trunk (unreleased changes) + + 1. HADOOP-210. Change RPC client and server so that server uses a + selector, and a thread per connection is no longer required. This + should permit larger clusters. (Devaraj Das via cutting) + + Release 0.3.2 - 2006-06-09 1. HADOOP-275. Update the streaming contrib module to use log4j for Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=413958&r1=413957&r2=413958&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Tue Jun 13 12:18:43 2006 @@ -38,6 +38,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.DataOutputBuffer; /** A client for an IPC service. IPC calls take a single [EMAIL PROTECTED] Writable} as a * parameter, and return a [EMAIL PROTECTED] Writable} as their value. A service runs on @@ -196,8 +197,15 @@ LOG.debug(getName() + " sending #" + call.id); try { writingCall = call; - out.writeInt(call.id); - call.param.write(out); + DataOutputBuffer d = new DataOutputBuffer(); //for serializing the + //data to be written + d.writeInt(call.id); + call.param.write(d); + byte[] data = d.getData(); + int dataLength = d.getLength(); + + out.writeInt(dataLength); //first put the data length + out.write(data, 0, dataLength);//write the data out.flush(); } finally { writingCall = null; @@ -208,7 +216,7 @@ if (error) close(); // close on error } - } + } /** Close the connection and remove it from the pool. */ public void close() { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=413958&r1=413957&r2=413958&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Tue Jun 13 12:18:43 2006 @@ -20,17 +20,23 @@ import java.io.EOFException; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; import java.io.StringWriter; import java.io.PrintWriter; +import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.BufferUnderflowException; + +import java.net.InetSocketAddress; import java.net.Socket; -import java.net.ServerSocket; -import java.net.SocketException; -import java.net.SocketTimeoutException; import java.util.LinkedList; +import java.util.Iterator; +import java.util.Random; import org.apache.commons.logging.*; @@ -38,7 +44,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.io.UTF8; + +import org.mortbay.http.nio.SocketChannelOutputStream; /** An abstract IPC service. IPC calls take a single [EMAIL PROTECTED] Writable} as a * parameter, and return a [EMAIL PROTECTED] Writable} as their value. A service runs on @@ -65,6 +72,16 @@ private int handlerCount; // number of handler threads private int maxQueuedCalls; // max number of queued calls private Class paramClass; // class of call parameters + private int maxIdleTime; // the maximum idle time after + // which a client may be disconnected + private int thresholdIdleConnections; // the number of idle connections + // after which we will start + // cleaning up idle + // connections + int maxConnectionsToNuke; // the max number of + // connections to nuke + //during a cleanup + private Configuration conf; private int timeout; @@ -73,6 +90,12 @@ private LinkedList callQueue = new LinkedList(); // queued calls private Object callDequeued = new Object(); // used by wait/notify + private InetSocketAddress address; //the address we bind at + private ServerSocketChannel acceptChannel = null; //the (main) accept channel + private Selector selector = null; //the selector that we use for the server + private Listener listener; + private int numConnections = 0; + /** A call queued for handling. */ private static class Call { private int id; // the client's call id @@ -86,113 +109,300 @@ } } - /** Listens on the socket, starting new connection threads. */ + /** Listens on the socket. Creates jobs for the handler threads*/ private class Listener extends Thread { - private ServerSocket socket; - - public Listener() throws IOException { - this.socket = new ServerSocket(port); - socket.setSoTimeout(timeout); + + private LinkedList connectionList = new LinkedList(); //maintain a list + //of client connectionss + private Random rand = new Random(); + private long lastCleanupRunTime = 0; //the last time when a cleanup connec- + //-tion (for idle connections) ran + private int cleanupInterval = 10000; //the minimum interval between + //two cleanup runs + + public Listener() { + address = new InetSocketAddress(port); this.setDaemon(true); - this.setName("Server listener on port " + port); + } + /** cleanup connections from connectionList. Choose a random range + * to scan and also have a limit on the number of the connections + * that will be cleanedup per run. The criteria for cleanup is the time + * for which the connection was idle. If 'force' is true then all + * connections will be looked at for the cleanup. + */ + private void cleanupConnections(boolean force) { + if (force || numConnections > thresholdIdleConnections) { + long currentTime = System.currentTimeMillis(); + if (!force && (int)(currentTime - lastCleanupRunTime) < cleanupInterval) { + return; + } + int start = 0; + int end = numConnections - 1; + if (!force) { + start = rand.nextInt() % numConnections; + end = rand.nextInt() % numConnections; + int temp; + if (end < start) { + temp = start; + start = end; + end = temp; + } + } + int i = start; + int numNuked = 0; + while (i <= end) { + Connection c = (Connection)connectionList.get(i); + if (c.timedOut(currentTime)) { + connectionList.remove(i); + try { + LOG.info(getName() + ": disconnecting client " + c.getHostAddress()); + c.close(); + } catch (Exception e) {} + numNuked++; + end--; + if (!force && numNuked == maxConnectionsToNuke) break; + } + else i++; + } + lastCleanupRunTime = System.currentTimeMillis(); + } } public void run() { - LOG.info(getName() + ": starting"); - while (running) { - Socket acceptedSock = null; - try { - acceptedSock = socket.accept(); - new Connection(acceptedSock).start(); // start a new connection - } catch (SocketTimeoutException e) { // ignore timeouts - } catch (OutOfMemoryError e) { - // we can run out of memory if we have too many threads - // log the event and sleep for a minute and give - // some thread(s) a chance to finish - LOG.warn(getName() + " out of memory, sleeping...", e); + SERVER.set(Server.this); + + try { + // Create a new server socket and set to non blocking mode + acceptChannel = ServerSocketChannel.open(); + acceptChannel.configureBlocking(false); + + // Bind the server socket to the local host and port + acceptChannel.socket().bind(address); + + // create a selector; + selector= Selector.open(); + + // Register accepts on the server socket with the selector. + acceptChannel.register(selector, SelectionKey.OP_ACCEPT); + this.setName("Server listener on port " + port); + + LOG.info(getName() + ": starting"); + + while (running) { + SelectionKey key = null; try { - acceptedSock.close(); + selector.select(timeout); + Iterator iter = selector.selectedKeys().iterator(); + + while (iter.hasNext()) { + key = (SelectionKey)iter.next(); + if (key.isAcceptable()) + doAccept(key); + else if (key.isReadable()) + doRead(key); + iter.remove(); + key = null; + } + } catch (OutOfMemoryError e) { + closeCurrentConnection(key, e); + cleanupConnections(true); Thread.sleep(60000); - } catch (InterruptedException ie) { // ignore interrupts - } catch (IOException ioe) { // ignore IOexceptions - } + } catch (Exception e) { + closeCurrentConnection(key, e); + } + cleanupConnections(false); } - catch (Exception e) { // log all other exceptions - LOG.info(getName() + " caught: " + e, e); - } + } catch (Exception e) { + LOG.fatal("selector",e); } + LOG.info("Stopping " + this.getName()); + try { - socket.close(); - } catch (IOException e) { - LOG.info(getName() + ": e=" + e); + if (acceptChannel != null) + acceptChannel.close(); + if (selector != null) + selector.close(); + } catch (IOException e) { } + + selector= null; + acceptChannel= null; + connectionList = null; + } + + private void closeCurrentConnection(SelectionKey key, Throwable e) { + if (running) { + LOG.warn("selector: " + e); + e.printStackTrace(); } - LOG.info(getName() + ": exiting"); + if (key != null) { + Connection c = (Connection)key.attachment(); + if (c != null) { + connectionList.remove(c); + try { + LOG.info(getName() + ": disconnecting client " + c.getHostAddress()); + c.close(); + } catch (Exception ex) {} + } + } + } + + void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { + Connection c = null; + ServerSocketChannel server = (ServerSocketChannel) key.channel(); + SocketChannel channel = server.accept(); + channel.configureBlocking(false); + SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ); + c = new Connection(readKey, channel, System.currentTimeMillis()); + readKey.attach(c); + connectionList.addLast(c); + numConnections++; + LOG.info("Server connection on port " + port + " from " + + c.getHostAddress() + ": starting"); + } + + void doRead(SelectionKey key) { + int count = 0; + if (!key.isValid() || !key.isReadable()) + return; + Connection c = (Connection)key.attachment(); + if (c == null) { + return; + } + c.setLastContact(System.currentTimeMillis()); + + try { + count = c.readAndProcess(); + } catch (Exception e) { + LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count); + count = -1; //so that the (count < 0) block is executed + } + if (count < 0) { + connectionList.remove(c); + try { + LOG.info(getName() + ": disconnecting client " + c.getHostAddress()); + c.close(); + } catch (Exception e) {} + } + else { + c.setLastContact(System.currentTimeMillis()); + } + } + + void doStop() + { + selector.wakeup(); + Thread.yield(); } } /** Reads calls from a connection and queues them for handling. */ - private class Connection extends Thread { - private Socket socket; - private DataInputStream in; + private class Connection { + private SocketChannel channel; + private SelectionKey key; + private ByteBuffer data; + private ByteBuffer dataLengthBuffer; private DataOutputStream out; + private long lastContact; + private int dataLength; + private Socket socket; - public Connection(Socket socket) throws IOException { - this.socket = socket; - socket.setSoTimeout(timeout); - this.in = new DataInputStream - (new BufferedInputStream(socket.getInputStream())); + public Connection(SelectionKey key, SocketChannel channel, + long lastContact) { + this.key = key; + this.channel = channel; + this.lastContact = lastContact; + this.data = null; + this.dataLengthBuffer = null; + this.socket = channel.socket(); this.out = new DataOutputStream - (new BufferedOutputStream(socket.getOutputStream())); - this.setDaemon(true); - this.setName("Server connection on port " + port + " from " - + socket.getInetAddress().getHostAddress()); + (new SocketChannelOutputStream(channel, 4096)); + } + + public String getHostAddress() { + return socket.getInetAddress().getHostAddress(); } - public void run() { - LOG.info(getName() + ": starting"); - SERVER.set(Server.this); - try { - while (running) { - int id; - try { - id = in.readInt(); // try to read an id - } catch (SocketTimeoutException e) { - continue; - } - - if (LOG.isDebugEnabled()) - LOG.debug(getName() + " got #" + id); - - Writable param = makeParam(); // read param - param.readFields(in); + public void setLastContact(long lastContact) { + this.lastContact = lastContact; + } + + public long getLastContact() { + return lastContact; + } + + private boolean timedOut() { + if(System.currentTimeMillis() - lastContact > maxIdleTime) + return true; + return false; + } + + private boolean timedOut(long currentTime) { + if(currentTime - lastContact > timeout) + return true; + return false; + } + + public int readAndProcess() throws IOException, InterruptedException { + int count = -1; + if (dataLengthBuffer == null) + dataLengthBuffer = ByteBuffer.allocateDirect(4); + if (dataLengthBuffer.remaining() > 0) { + count = channel.read(dataLengthBuffer); + if (count < 0) return count; + if (dataLengthBuffer.remaining() == 0) { + dataLengthBuffer.flip(); + dataLength = dataLengthBuffer.getInt(); + data = ByteBuffer.allocateDirect(dataLength); + } + return count; + } + count = channel.read(data); + if (data.remaining() == 0) { + data.flip(); + processData(); + data = dataLengthBuffer = null; + } + return count; + } + + private void processData() throws IOException, InterruptedException { + byte[] bytes = new byte[dataLength]; + data.get(bytes); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes)); + int id = dis.readInt(); // try to read an id - Call call = new Call(id, param, this); + if (LOG.isDebugEnabled()) + LOG.debug(" got #" + id); + + Writable param = makeParam(); // read param + param.readFields(dis); - synchronized (callQueue) { - callQueue.addLast(call); // queue the call - callQueue.notify(); // wake up a waiting handler - } + Call call = new Call(id, param, this); + synchronized (callQueue) { + callQueue.addLast(call); // queue the call + callQueue.notify(); // wake up a waiting handler + } - while (running && callQueue.size() >= maxQueuedCalls) { - synchronized (callDequeued) { // queue is full - callDequeued.wait(timeout); // wait for a dequeue - } - } + while (running && callQueue.size() >= maxQueuedCalls) { + synchronized (callDequeued) { // queue is full + callDequeued.wait(timeout); // wait for a dequeue } - } catch (EOFException eof) { - // This is what happens on linux when the other side shuts down - } catch (SocketException eof) { - // This is what happens on Win32 when the other side shuts down - } catch (Exception e) { - LOG.info(getName() + " caught: " + e, e); - } finally { - try { - socket.close(); - } catch (IOException e) {} - LOG.info(getName() + ": exiting"); } } + private void close() throws IOException { + data = null; + dataLengthBuffer = null; + if (!channel.isOpen()) + return; + socket.shutdownOutput(); + channel.close(); + socket.close(); + channel.close(); + out.close(); + key.cancel(); + numConnections--; + } } /** Handles queued calls . */ @@ -245,7 +455,6 @@ WritableUtils.writeString(out, errorClass); WritableUtils.writeString(out, error); } - out.flush(); } } catch (Exception e) { @@ -275,7 +484,10 @@ this.paramClass = paramClass; this.handlerCount = handlerCount; this.maxQueuedCalls = handlerCount; - this.timeout = conf.getInt("ipc.client.timeout",10000); + this.timeout = conf.getInt("ipc.client.timeout",10000); + this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000); + this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); + this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 120000); } /** Sets the timeout used for network i/o. */ @@ -283,7 +495,7 @@ /** Starts the service. Must be called before any calls will be handled. */ public synchronized void start() throws IOException { - Listener listener = new Listener(); + listener = new Listener(); listener.start(); for (int i = 0; i < handlerCount; i++) { @@ -298,6 +510,7 @@ public synchronized void stop() { LOG.info("Stopping server on " + port); running = false; + listener.doStop(); try { Thread.sleep(timeout); // inexactly wait for pending requests to finish } catch (InterruptedException e) {}