Author: dhruba Date: Wed Dec 12 17:04:47 2007 New Revision: 603795 URL: http://svn.apache.org/viewvc?rev=603795&view=rev Log: HADOOP-1841. Prevent slow clients from consuming threads in the NameNode. (dhruba)
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=603795&r1=603794&r2=603795&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Dec 12 17:04:47 2007 @@ -238,6 +238,9 @@ HADOOP-2359. Remove warning for interruptted exception when closing down minidfs. (dhruba via omalley) + HADOOP-1841. Prevent slow clients from consuming threads in the NameNode. + (dhruba) + Branch 0.15 (unreleased) BUG FIXES Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=603795&r1=603794&r2=603795&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Dec 12 17:04:47 2007 @@ -73,6 +73,7 @@ private int maxRetries; //the max. no. of retries for socket connections private Thread connectionCullerThread; private SocketFactory socketFactory; // how to create sockets + private boolean simulateError = false; // unit tests /** A call waiting for a value. */ private class Call { @@ -286,6 +287,7 @@ } else { Writable value = (Writable)ReflectionUtils.newInstance(valueClass, conf); try { + waitForEndSimulation(); readingCall = call; value.readFields(in); // read value } finally { @@ -609,4 +611,19 @@ return address.hashCode() ^ System.identityHashCode(ticket); } } + + void simulateError(boolean flag) { + simulateError = flag; + } + + // If errors are being simulated, then wait. + private void waitForEndSimulation() { + while (simulateError) { + try { + LOG.info("RPC Client waiting for simulation to end"); + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=603795&r1=603794&r2=603795&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Wed Dec 12 17:04:47 2007 @@ -30,6 +30,7 @@ import java.io.*; import java.util.Map; import java.util.HashMap; +import java.util.Collection; import javax.net.SocketFactory; @@ -168,6 +169,17 @@ CLIENTS.clear(); } + /* + * remove specified client from the list of clients. + */ + static synchronized void removeClients() { + CLIENTS.clear(); + } + + static synchronized Collection allClients() { + return CLIENTS.values(); + } + private static class Invoker implements InvocationHandler { private InetSocketAddress address; private UserGroupInformation ticket; @@ -415,5 +427,4 @@ value = value.substring(0, 55)+"..."; LOG.info(value); } - } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=603795&r1=603794&r2=603795&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed Dec 12 17:04:47 2007 @@ -23,6 +23,7 @@ import java.io.DataOutputStream; import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; @@ -44,12 +45,13 @@ import java.util.Iterator; import java.util.Random; -import org.apache.commons.logging.*; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.ipc.SocketChannelOutputStream; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.*; @@ -145,6 +147,7 @@ private int timeout; private long maxCallStartAge; private int maxQueueSize; + private int socketSendBufferSize; volatile private boolean running = true; // true while server runs private LinkedList<Call> callQueue = new LinkedList<Call>(); // queued calls @@ -154,6 +157,7 @@ //maintain a list //of client connections private Listener listener = null; + private Responder responder = null; private int numConnections = 0; private Handler[] handlers = null; @@ -191,17 +195,24 @@ private Writable param; // the parameter passed private Connection connection; // connection to client private long receivedTime; // the time received + private ByteBuffer response; // the response for this call public Call(int id, Writable param, Connection connection) { this.id = id; this.param = param; this.connection = connection; this.receivedTime = System.currentTimeMillis(); + this.response = null; } + @Override public String toString() { return param.toString() + " from " + connection.toString(); } + + public void setResponse(ByteBuffer response) { + this.response = response; + } } /** Listens on the socket. Creates jobs for the handler threads*/ @@ -288,6 +299,7 @@ } } + @Override public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); @@ -428,6 +440,234 @@ } } + // Sends responses of RPC back to clients. + private class Responder extends Thread { + private Selector writeSelector; + private boolean pending; // call waiting to be enqueued + + Responder() throws IOException { + this.setName("IPC Server Responder"); + this.setDaemon(true); + writeSelector = Selector.open(); // create a selector + pending = false; + } + + @Override + public void run() { + LOG.info(getName() + ": starting"); + SERVER.set(Server.this); + long lastPurgeTime = 0; // last check for old calls. + + while (running) { + SelectionKey key = null; + try { + waitPending(); // If a channel is being registered, wait. + writeSelector.select(maxCallStartAge); + Iterator iter = writeSelector.selectedKeys().iterator(); + while (iter.hasNext()) { + key = (SelectionKey)iter.next(); + iter.remove(); + try { + if (key.isValid() && key.isWritable()) { + doAsyncWrite(key); + } + } catch (IOException e) { + LOG.info(getName() + ": doAsyncWrite threw exception " + e); + key.cancel(); + } + key = null; + } + long now = System.currentTimeMillis(); + if (now < lastPurgeTime + maxCallStartAge) { + continue; + } + lastPurgeTime = now; + // + // If there were some calls that have not been sent out for a + // long time, discard them. + // + LOG.debug("Checking for old call responses."); + iter = writeSelector.keys().iterator(); + while (iter.hasNext()) { + key = (SelectionKey)iter.next(); + try { + doPurge(key, now); + } catch (IOException e) { + LOG.warn("Error in purging old calls " + e); + } + } + } catch (OutOfMemoryError e) { + // + // we can run out of memory if we have too many threads + // log the event and sleep for a minute and give + // some thread(s) a chance to finish + // + LOG.warn("Out of Memory in server select", e); + try { Thread.sleep(60000); } catch (Exception ie) {} + } catch (Exception e) { + LOG.warn("Exception in Responder " + e); + } + } + LOG.info("Stopping " + this.getName()); + } + + private void doAsyncWrite(SelectionKey key) throws IOException { + Call call = (Call)key.attachment(); + if (call == null) { + return; + } + if (key.channel() != call.connection.channel) { + throw new IOException("doAsyncWrite: bad channel"); + } + if (processResponse(call.connection.responseQueue)) { + key.cancel(); // remove item from selector. + } + } + + // + // Remove calls that have been pending in the responseQueue + // for a long time. + // + private void doPurge(SelectionKey key, long now) throws IOException { + Call call = (Call)key.attachment(); + if (call == null) { + return; + } + if (key.channel() != call.connection.channel) { + LOG.info("doPurge: bad channel"); + return; + } + LinkedList<Call> responseQueue = call.connection.responseQueue; + synchronized (responseQueue) { + Iterator iter = responseQueue.listIterator(0); + while (iter.hasNext()) { + call = (Call)iter.next(); + if (now > call.receivedTime + maxCallStartAge) { + LOG.info(getName() + ", call " + call + + ": response discarded for being too old (" + + (now - call.receivedTime) + ")"); + iter.remove(); + } + } + + // If all the calls for this channel were removed, then + // remove this channel from the selector + if (responseQueue.size() == 0) { + key.cancel(); + } + } + } + + // Processes one response. Returns true if there are no more pending + // data for this channel. + // + private boolean processResponse(LinkedList<Call> responseQueue) throws IOException { + boolean error = true; + boolean done = false; // there is more data for this channel. + int numElements = 0; + Call call = null; + try { + synchronized (responseQueue) { + // + // If there are no items for this channel, then we are done + // + numElements = responseQueue.size(); + if (numElements == 0) { + error = false; + return true; // no more data for this channel. + } + // + // Extract the first call + // + int numBytes = 0; + call = responseQueue.removeFirst(); + SocketChannel channel = call.connection.channel; + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": responding to #" + call.id + " from " + + call.connection); + } + // + // Send as much data as we can in the non-blocking fashion + // + numBytes = channel.write(call.response); + if (!call.response.hasRemaining()) { + if (numElements == 1) { // last call fully processes. + done = true; // no more data for this channel. + } else { + done = false; // more calls pending to be sent. + } + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": responding to #" + call.id + " from " + + call.connection + " Wrote " + numBytes + " bytes."); + } + } else { + // + // If we were unable to write the entire response out, then + // insert in Selector queue. + // + call.connection.responseQueue.addFirst(call); + setPending(); + try { + // Wakeup the thread blocked on select, only then can the call + // to channel.register() complete. + writeSelector.wakeup(); + SelectionKey readKey = channel.register(writeSelector, + SelectionKey.OP_WRITE); + readKey.attach(call); + } finally { + clearPending(); + } + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": responding to #" + call.id + " from " + + call.connection + " Wrote partial " + numBytes + + " bytes."); + } + done = false; // this call not fully processed. + } + error = false; // everything went off well + } + } finally { + if (error && call != null) { + LOG.warn(getName()+", call " + call + ": output error"); + done = true; // error. no more data for this channel. + synchronized (connectionList) { + if (connectionList.remove(call.connection)) + numConnections--; + } + call.connection.close(); + } + } + return done; + } + + // + // Enqueue a response from the application. + // + void doRespond(Call call) throws IOException { + synchronized (call.connection.responseQueue) { + call.connection.responseQueue.addLast(call); + if (call.connection.responseQueue.size() == 1) { + processResponse(call.connection.responseQueue); + } + } + } + + private synchronized void setPending() { // call waiting to be enqueued. + pending = true; + } + + private synchronized void clearPending() { // call done enqueueing. + pending = false; + notify(); + } + + private synchronized void waitPending() throws InterruptedException { + while (pending) { + wait(); + } + } + } + /** Reads calls from a connection and queues them for handling. */ private class Connection { private boolean versionRead = false; //if initial signature and @@ -438,8 +678,7 @@ private SelectionKey key; private ByteBuffer data; private ByteBuffer dataLengthBuffer; - private DataOutputStream out; - private SocketChannelOutputStream channelOut; + private LinkedList<Call> responseQueue; private long lastContact; private int dataLength; private Socket socket; @@ -457,9 +696,6 @@ this.data = null; this.dataLengthBuffer = ByteBuffer.allocate(4); this.socket = channel.socket(); - this.out = new DataOutputStream - (new BufferedOutputStream( - this.channelOut = new SocketChannelOutputStream(channel))); InetAddress addr = socket.getInetAddress(); if (addr == null) { this.hostAddress = "*Unknown*"; @@ -467,8 +703,18 @@ this.hostAddress = addr.getHostAddress(); } this.remotePort = socket.getPort(); + this.responseQueue = new LinkedList<Call>(); + if (socketSendBufferSize != 0) { + try { + socket.setSendBufferSize(socketSendBufferSize); + } catch (IOException e) { + LOG.warn("Connection: unable to set socket send buffer size to " + + socketSendBufferSize); + } + } } + @Override public String toString() { return getHostAddress() + ":" + remotePort; } @@ -516,7 +762,9 @@ if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) { //Warning is ok since this is not supposed to happen. LOG.warn("Incorrect header or version mismatch from " + - hostAddress + ":" + remotePort); + hostAddress + ":" + remotePort + + " got version " + version + + " expected version " + CURRENT_VERSION); return -1; } dataLengthBuffer.clear(); @@ -589,8 +837,6 @@ if (!channel.isOpen()) return; try {socket.shutdownOutput();} catch(Exception e) {} - try {out.close();} catch(Exception e) {} - try {channelOut.destroy();} catch(Exception e) {} if (channel.isOpen()) { try {channel.close();} catch(Exception e) {} } @@ -607,9 +853,11 @@ this.setName("IPC Server handler "+ instanceNumber + " on " + port); } + @Override public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); + ByteArrayOutputStream buf = new ByteArrayOutputStream(10240); while (running) { try { Call call; @@ -648,28 +896,20 @@ error = StringUtils.stringifyException(e); } CurCall.set(null); - - DataOutputStream out = call.connection.out; - synchronized (out) { - try { - out.writeInt(call.id); // write call id - out.writeBoolean(error!=null); // write error flag - if (error == null) { - value.write(out); - } else { - WritableUtils.writeString(out, errorClass); - WritableUtils.writeString(out, error); - } - out.flush(); - } catch (Exception e) { - LOG.warn(getName()+", call "+call+": output error", e); - synchronized (connectionList) { - if (connectionList.remove(call.connection)) - numConnections--; - } - call.connection.close(); - } + + buf.reset(); + DataOutputStream out = new DataOutputStream(buf); + out.writeInt(call.id); // write call id + out.writeBoolean(error != null); // write error flag + + if (error == null) { + value.write(out); + } else { + WritableUtils.writeString(out, errorClass); + WritableUtils.writeString(out, error); } + call.setResponse(ByteBuffer.wrap(buf.toByteArray())); + responder.doRespond(call); } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(getName() + " caught: " + e, e); @@ -695,6 +935,7 @@ this.paramClass = paramClass; this.handlerCount = handlerCount; this.timeout = conf.getInt("ipc.client.timeout", 10000); + this.socketSendBufferSize = 0; maxCallStartAge = (long) (timeout * MAX_CALL_QUEUE_TIME); maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER; this.maxIdleTime = conf.getInt("ipc.client.maxidletime", 120000); @@ -704,13 +945,20 @@ // Start the listener here and let it bind to the port listener = new Listener(); this.port = listener.getAddress().getPort(); + + // Create the responder here + responder = new Responder(); } /** Sets the timeout used for network i/o. */ public void setTimeout(int timeout) { this.timeout = timeout; } + /** Sets the socket buffer size used for responding to RPCs */ + public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; } + /** Starts the service. Must be called before any calls will be handled. */ public synchronized void start() throws IOException { + responder.start(); listener.start(); handlers = new Handler[handlerCount]; @@ -733,6 +981,7 @@ } listener.interrupt(); listener.doStop(); + responder.interrupt(); notifyAll(); } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=603795&r1=603794&r2=603795&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java Wed Dec 12 17:04:47 2007 @@ -25,6 +25,8 @@ import junit.framework.TestCase; import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; import org.apache.commons.logging.*; @@ -43,6 +45,9 @@ private static Configuration conf = new Configuration(); + int datasize = 1024*100; + int numThreads = 50; + public TestRPC(String name) { super(name); } public interface TestProtocol extends VersionedProtocol { @@ -56,6 +61,7 @@ int add(int[] values) throws IOException; int error() throws IOException; void testServerGet() throws IOException; + int[] exchange(int[] values) throws IOException; } public class TestImpl implements TestProtocol { @@ -95,8 +101,117 @@ } } + public int[] exchange(int[] values) { + int sum = 0; + for (int i = 0; i < values.length; i++) { + values[i] = i; + } + return values; + } + } + + // + // an object that does a bunch of transactions + // + static class Transactions implements Runnable { + int datasize; + TestProtocol proxy; + + Transactions(TestProtocol proxy, int datasize) { + this.proxy = proxy; + this.datasize = datasize; + } + + // do two RPC that transfers data. + public void run() { + int[] indata = new int[datasize]; + int[] outdata = null; + int val = 0; + try { + outdata = proxy.exchange(indata); + val = proxy.add(1,2); + } catch (IOException e) { + assertTrue("Exception from RPC exchange() " + e, false); + } + assertEquals(indata.length, outdata.length); + assertEquals(val, 3); + for (int i = 0; i < outdata.length; i++) { + assertEquals(outdata[i], i); + } + } + } + + // + // A class that does an RPC but does not read its response. + // + static class SlowRPC implements Runnable { + private TestProtocol proxy; + private volatile boolean done; + + SlowRPC(TestProtocol proxy) { + this.proxy = proxy; + done = false; + } + + boolean isDone() { + return done; + } + + public void run() { + try { + proxy.ping(); // this would hang until simulateError is false + done = true; + } catch (IOException e) { + assertTrue("SlowRPC ping exception " + e, false); + } + } + } + + public void testSlowRpc() throws Exception { + System.out.println("Testing Slow RPC"); + Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf); + server.start(); + + InetSocketAddress addr = server.getListenerAddress(); + + // create a client and make an RPC that does not read its response + // + TestProtocol proxy1 = + (TestProtocol)RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf); + Collection collection = RPC.allClients(); + assertTrue("There should be only one client.", collection.size() == 1); + Iterator iter = collection.iterator(); + Client client = (Client) iter.next(); + + client.simulateError(true); + RPC.removeClients(); + SlowRPC slowrpc = new SlowRPC(proxy1); + Thread thread = new Thread(slowrpc, "SlowRPC"); + thread.start(); + assertTrue("Slow RPC should not have finished1.", !slowrpc.isDone()); + + // create another client and make another RPC to the same server. This + // should complete even though the first one is still hanging. + // + TestProtocol proxy2 = + (TestProtocol)RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf); + proxy2.ping(); + + // verify that the first RPC is still stuck + assertTrue("Slow RPC should not have finished2.", !slowrpc.isDone()); + + // Make the first RPC process its response. + client.simulateError(false); + while (!slowrpc.isDone()) { + System.out.println("Waiting for slow RPC to get done."); + try { + Thread.sleep(1000); + } catch (InterruptedException e) {} + } + server.stop(); } + public void testCalls() throws Exception { Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf); server.start(); @@ -142,6 +257,26 @@ proxy.testServerGet(); + // create multiple threads and make them do large data transfers + System.out.println("Starting multi-threaded RPC test..."); + server.setSocketSendBufSize(1024); + Thread threadId[] = new Thread[numThreads]; + for (int i = 0; i < numThreads; i++) { + Transactions trans = new Transactions(proxy, datasize); + threadId[i] = new Thread(trans, "TransactionThread-" + i); + threadId[i].start(); + } + + // wait for all transactions to get over + System.out.println("Waiting for all threads to finish RPCs..."); + for (int i = 0; i < numThreads; i++) { + try { + threadId[i].join(); + } catch (InterruptedException e) { + i--; // retry + } + } + // try some multi-calls Method echo = TestProtocol.class.getMethod("echo", new Class[] { String.class }); @@ -161,5 +296,4 @@ new TestRPC("test").testCalls(); } - }