Author: cutting Date: Wed Jun 14 15:41:25 2006 New Revision: 414404 URL: http://svn.apache.org/viewvc?rev=414404&view=rev Log: Reverting patch for HADOOP-210, which was causing problems.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=414404&r1=414403&r2=414404&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Jun 14 15:41:25 2006 @@ -3,11 +3,7 @@ Trunk (unreleased changes) - 1. HADOOP-210. Change RPC client and server so that server uses a - selector, and a thread per connection is no longer required. This - should permit larger clusters. (Devaraj Das via cutting) - - 2. HADOOP-298. Improved progress reports for CopyFiles utility, the + 1. HADOOP-298. Improved progress reports for CopyFiles utility, the distributed file copier. (omalley via cutting) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=414404&r1=414403&r2=414404&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Jun 14 15:41:25 2006 @@ -38,7 +38,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.UTF8; -import org.apache.hadoop.io.DataOutputBuffer; /** A client for an IPC service. IPC calls take a single [EMAIL PROTECTED] Writable} as a * parameter, and return a [EMAIL PROTECTED] Writable} as their value. A service runs on @@ -197,15 +196,8 @@ LOG.debug(getName() + " sending #" + call.id); try { writingCall = call; - DataOutputBuffer d = new DataOutputBuffer(); //for serializing the - //data to be written - d.writeInt(call.id); - call.param.write(d); - byte[] data = d.getData(); - int dataLength = d.getLength(); - - out.writeInt(dataLength); //first put the data length - out.write(data, 0, dataLength);//write the data + out.writeInt(call.id); + call.param.write(out); out.flush(); } finally { writingCall = null; @@ -216,7 +208,7 @@ if (error) close(); // close on error } - } + } /** Close the connection and remove it from the pool. */ public void close() { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=414404&r1=414403&r2=414404&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed Jun 14 15:41:25 2006 @@ -20,23 +20,17 @@ import java.io.EOFException; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.StringWriter; import java.io.PrintWriter; -import java.io.ByteArrayInputStream; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.nio.BufferUnderflowException; - -import java.net.InetSocketAddress; import java.net.Socket; +import java.net.ServerSocket; +import java.net.SocketException; +import java.net.SocketTimeoutException; import java.util.LinkedList; -import java.util.Iterator; -import java.util.Random; import org.apache.commons.logging.*; @@ -44,8 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; - -import org.mortbay.http.nio.SocketChannelOutputStream; +import org.apache.hadoop.io.UTF8; /** An abstract IPC service. IPC calls take a single [EMAIL PROTECTED] Writable} as a * parameter, and return a [EMAIL PROTECTED] Writable} as their value. A service runs on @@ -72,16 +65,6 @@ private int handlerCount; // number of handler threads private int maxQueuedCalls; // max number of queued calls private Class paramClass; // class of call parameters - private int maxIdleTime; // the maximum idle time after - // which a client may be disconnected - private int thresholdIdleConnections; // the number of idle connections - // after which we will start - // cleaning up idle - // connections - int maxConnectionsToNuke; // the max number of - // connections to nuke - //during a cleanup - private Configuration conf; private int timeout; @@ -90,12 +73,6 @@ private LinkedList callQueue = new LinkedList(); // queued calls private Object callDequeued = new Object(); // used by wait/notify - private InetSocketAddress address; //the address we bind at - private ServerSocketChannel acceptChannel = null; //the (main) accept channel - private Selector selector = null; //the selector that we use for the server - private Listener listener; - private int numConnections = 0; - /** A call queued for handling. */ private static class Call { private int id; // the client's call id @@ -109,300 +86,113 @@ } } - /** Listens on the socket. Creates jobs for the handler threads*/ + /** Listens on the socket, starting new connection threads. */ private class Listener extends Thread { - - private LinkedList connectionList = new LinkedList(); //maintain a list - //of client connectionss - private Random rand = new Random(); - private long lastCleanupRunTime = 0; //the last time when a cleanup connec- - //-tion (for idle connections) ran - private int cleanupInterval = 10000; //the minimum interval between - //two cleanup runs - - public Listener() { - address = new InetSocketAddress(port); + private ServerSocket socket; + + public Listener() throws IOException { + this.socket = new ServerSocket(port); + socket.setSoTimeout(timeout); this.setDaemon(true); - } - /** cleanup connections from connectionList. Choose a random range - * to scan and also have a limit on the number of the connections - * that will be cleanedup per run. The criteria for cleanup is the time - * for which the connection was idle. If 'force' is true then all - * connections will be looked at for the cleanup. - */ - private void cleanupConnections(boolean force) { - if (force || numConnections > thresholdIdleConnections) { - long currentTime = System.currentTimeMillis(); - if (!force && (int)(currentTime - lastCleanupRunTime) < cleanupInterval) { - return; - } - int start = 0; - int end = numConnections - 1; - if (!force) { - start = rand.nextInt() % numConnections; - end = rand.nextInt() % numConnections; - int temp; - if (end < start) { - temp = start; - start = end; - end = temp; - } - } - int i = start; - int numNuked = 0; - while (i <= end) { - Connection c = (Connection)connectionList.get(i); - if (c.timedOut(currentTime)) { - connectionList.remove(i); - try { - LOG.info(getName() + ": disconnecting client " + c.getHostAddress()); - c.close(); - } catch (Exception e) {} - numNuked++; - end--; - if (!force && numNuked == maxConnectionsToNuke) break; - } - else i++; - } - lastCleanupRunTime = System.currentTimeMillis(); - } + this.setName("Server listener on port " + port); } public void run() { - SERVER.set(Server.this); - - try { - // Create a new server socket and set to non blocking mode - acceptChannel = ServerSocketChannel.open(); - acceptChannel.configureBlocking(false); - - // Bind the server socket to the local host and port - acceptChannel.socket().bind(address); - - // create a selector; - selector= Selector.open(); - - // Register accepts on the server socket with the selector. - acceptChannel.register(selector, SelectionKey.OP_ACCEPT); - this.setName("Server listener on port " + port); - - LOG.info(getName() + ": starting"); - - while (running) { - SelectionKey key = null; + LOG.info(getName() + ": starting"); + while (running) { + Socket acceptedSock = null; + try { + acceptedSock = socket.accept(); + new Connection(acceptedSock).start(); // start a new connection + } catch (SocketTimeoutException e) { // ignore timeouts + } catch (OutOfMemoryError e) { + // we can run out of memory if we have too many threads + // log the event and sleep for a minute and give + // some thread(s) a chance to finish + LOG.warn(getName() + " out of memory, sleeping...", e); try { - selector.select(timeout); - Iterator iter = selector.selectedKeys().iterator(); - - while (iter.hasNext()) { - key = (SelectionKey)iter.next(); - if (key.isAcceptable()) - doAccept(key); - else if (key.isReadable()) - doRead(key); - iter.remove(); - key = null; - } - } catch (OutOfMemoryError e) { - closeCurrentConnection(key, e); - cleanupConnections(true); + acceptedSock.close(); Thread.sleep(60000); - } catch (Exception e) { - closeCurrentConnection(key, e); - } - cleanupConnections(false); - } - } catch (Exception e) { - LOG.fatal("selector",e); - } - LOG.info("Stopping " + this.getName()); - - try { - if (acceptChannel != null) - acceptChannel.close(); - if (selector != null) - selector.close(); - } catch (IOException e) { } - - selector= null; - acceptChannel= null; - connectionList = null; - } - - private void closeCurrentConnection(SelectionKey key, Throwable e) { - if (running) { - LOG.warn("selector: " + e); - e.printStackTrace(); - } - if (key != null) { - Connection c = (Connection)key.attachment(); - if (c != null) { - connectionList.remove(c); - try { - LOG.info(getName() + ": disconnecting client " + c.getHostAddress()); - c.close(); - } catch (Exception ex) {} + } catch (InterruptedException ie) { // ignore interrupts + } catch (IOException ioe) { // ignore IOexceptions + } } + catch (Exception e) { // log all other exceptions + LOG.info(getName() + " caught: " + e, e); + } } - } - - void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { - Connection c = null; - ServerSocketChannel server = (ServerSocketChannel) key.channel(); - SocketChannel channel = server.accept(); - channel.configureBlocking(false); - SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ); - c = new Connection(readKey, channel, System.currentTimeMillis()); - readKey.attach(c); - connectionList.addLast(c); - numConnections++; - LOG.info("Server connection on port " + port + " from " + - c.getHostAddress() + ": starting"); - } - - void doRead(SelectionKey key) { - int count = 0; - if (!key.isValid() || !key.isReadable()) - return; - Connection c = (Connection)key.attachment(); - if (c == null) { - return; - } - c.setLastContact(System.currentTimeMillis()); - try { - count = c.readAndProcess(); - } catch (Exception e) { - LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count); - count = -1; //so that the (count < 0) block is executed + socket.close(); + } catch (IOException e) { + LOG.info(getName() + ": e=" + e); } - if (count < 0) { - connectionList.remove(c); - try { - LOG.info(getName() + ": disconnecting client " + c.getHostAddress()); - c.close(); - } catch (Exception e) {} - } - else { - c.setLastContact(System.currentTimeMillis()); - } - } - - void doStop() - { - selector.wakeup(); - Thread.yield(); + LOG.info(getName() + ": exiting"); } } /** Reads calls from a connection and queues them for handling. */ - private class Connection { - private SocketChannel channel; - private SelectionKey key; - private ByteBuffer data; - private ByteBuffer dataLengthBuffer; - private DataOutputStream out; - private long lastContact; - private int dataLength; + private class Connection extends Thread { private Socket socket; + private DataInputStream in; + private DataOutputStream out; - public Connection(SelectionKey key, SocketChannel channel, - long lastContact) { - this.key = key; - this.channel = channel; - this.lastContact = lastContact; - this.data = null; - this.dataLengthBuffer = null; - this.socket = channel.socket(); + public Connection(Socket socket) throws IOException { + this.socket = socket; + socket.setSoTimeout(timeout); + this.in = new DataInputStream + (new BufferedInputStream(socket.getInputStream())); this.out = new DataOutputStream - (new SocketChannelOutputStream(channel, 4096)); - } - - public String getHostAddress() { - return socket.getInetAddress().getHostAddress(); - } - - public void setLastContact(long lastContact) { - this.lastContact = lastContact; - } - - public long getLastContact() { - return lastContact; - } - - private boolean timedOut() { - if(System.currentTimeMillis() - lastContact > maxIdleTime) - return true; - return false; - } - - private boolean timedOut(long currentTime) { - if(currentTime - lastContact > timeout) - return true; - return false; - } - - public int readAndProcess() throws IOException, InterruptedException { - int count = -1; - if (dataLengthBuffer == null) - dataLengthBuffer = ByteBuffer.allocateDirect(4); - if (dataLengthBuffer.remaining() > 0) { - count = channel.read(dataLengthBuffer); - if (count < 0) return count; - if (dataLengthBuffer.remaining() == 0) { - dataLengthBuffer.flip(); - dataLength = dataLengthBuffer.getInt(); - data = ByteBuffer.allocateDirect(dataLength); - } - return count; - } - count = channel.read(data); - if (data.remaining() == 0) { - data.flip(); - processData(); - data = dataLengthBuffer = null; - } - return count; + (new BufferedOutputStream(socket.getOutputStream())); + this.setDaemon(true); + this.setName("Server connection on port " + port + " from " + + socket.getInetAddress().getHostAddress()); } - private void processData() throws IOException, InterruptedException { - byte[] bytes = new byte[dataLength]; - data.get(bytes); - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes)); - int id = dis.readInt(); // try to read an id + public void run() { + LOG.info(getName() + ": starting"); + SERVER.set(Server.this); + try { + while (running) { + int id; + try { + id = in.readInt(); // try to read an id + } catch (SocketTimeoutException e) { + continue; + } - if (LOG.isDebugEnabled()) - LOG.debug(" got #" + id); - - Writable param = makeParam(); // read param - param.readFields(dis); + if (LOG.isDebugEnabled()) + LOG.debug(getName() + " got #" + id); - Call call = new Call(id, param, this); - synchronized (callQueue) { - callQueue.addLast(call); // queue the call - callQueue.notify(); // wake up a waiting handler - } + Writable param = makeParam(); // read param + param.readFields(in); + + Call call = new Call(id, param, this); + + synchronized (callQueue) { + callQueue.addLast(call); // queue the call + callQueue.notify(); // wake up a waiting handler + } - while (running && callQueue.size() >= maxQueuedCalls) { - synchronized (callDequeued) { // queue is full - callDequeued.wait(timeout); // wait for a dequeue + while (running && callQueue.size() >= maxQueuedCalls) { + synchronized (callDequeued) { // queue is full + callDequeued.wait(timeout); // wait for a dequeue + } + } } + } catch (EOFException eof) { + // This is what happens on linux when the other side shuts down + } catch (SocketException eof) { + // This is what happens on Win32 when the other side shuts down + } catch (Exception e) { + LOG.info(getName() + " caught: " + e, e); + } finally { + try { + socket.close(); + } catch (IOException e) {} + LOG.info(getName() + ": exiting"); } } - private void close() throws IOException { - data = null; - dataLengthBuffer = null; - if (!channel.isOpen()) - return; - socket.shutdownOutput(); - channel.close(); - socket.close(); - channel.close(); - out.close(); - key.cancel(); - numConnections--; - } } /** Handles queued calls . */ @@ -455,6 +245,7 @@ WritableUtils.writeString(out, errorClass); WritableUtils.writeString(out, error); } + out.flush(); } } catch (Exception e) { @@ -484,10 +275,7 @@ this.paramClass = paramClass; this.handlerCount = handlerCount; this.maxQueuedCalls = handlerCount; - this.timeout = conf.getInt("ipc.client.timeout",10000); - this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000); - this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); - this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 120000); + this.timeout = conf.getInt("ipc.client.timeout",10000); } /** Sets the timeout used for network i/o. */ @@ -495,7 +283,7 @@ /** Starts the service. Must be called before any calls will be handled. */ public synchronized void start() throws IOException { - listener = new Listener(); + Listener listener = new Listener(); listener.start(); for (int i = 0; i < handlerCount; i++) { @@ -510,7 +298,6 @@ public synchronized void stop() { LOG.info("Stopping server on " + port); running = false; - listener.doStop(); try { Thread.sleep(timeout); // inexactly wait for pending requests to finish } catch (InterruptedException e) {}