Author: todd
Date: Thu May 26 07:48:03 2011
New Revision: 1127811
URL: http://svn.apache.org/viewvc?rev=1127811&view=rev
Log:
HADOOP-7146. RPC server leaks file descriptors. Contributed by Todd Lipcon.
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestIPC.java
Modified: hadoop/common/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=1127811&r1=1127810&r2=1127811&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Thu May 26 07:48:03 2011
@@ -732,6 +732,8 @@ Release 0.22.0 - Unreleased
HADOOP-7287. Configuration deprecation mechanism doesn't work properly for
GenericOptionsParser and Tools. (Aaron T. Myers via todd)
+ HADOOP-7146. RPC server leaks file descriptors (todd)
+
Release 0.21.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=1127811&r1=1127810&r2=1127811&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java Thu May 26
07:48:03 2011
@@ -322,9 +322,8 @@ public abstract class Server {
selector= Selector.open();
readers = new Reader[readThreads];
for (int i = 0; i < readThreads; i++) {
- Selector readSelector = Selector.open();
- Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port "
+ port,
- readSelector);
+ Reader reader = new Reader(
+ "Socket Reader #" + (i + 1) + " for port " + port);
readers[i] = reader;
reader.start();
}
@@ -337,42 +336,53 @@ public abstract class Server {
private class Reader extends Thread {
private volatile boolean adding = false;
- private Selector readSelector = null;
+ private final Selector readSelector;
- Reader(String name, Selector readSelector) {
+ Reader(String name) throws IOException {
super(name);
- this.readSelector = readSelector;
+
+ this.readSelector = Selector.open();
}
+
public void run() {
LOG.info("Starting " + getName());
- synchronized (this) {
- while (running) {
- SelectionKey key = null;
- try {
- readSelector.select();
- while (adding) {
- this.wait(1000);
- }
-
- Iterator<SelectionKey> iter =
readSelector.selectedKeys().iterator();
- while (iter.hasNext()) {
- key = iter.next();
- iter.remove();
- if (key.isValid()) {
- if (key.isReadable()) {
- doRead(key);
- }
+ try {
+ doRunLoop();
+ } finally {
+ try {
+ readSelector.close();
+ } catch (IOException ioe) {
+ LOG.error("Error closing read selector in " + this.getName(), ioe);
+ }
+ }
+ }
+
+ private synchronized void doRunLoop() {
+ while (running) {
+ SelectionKey key = null;
+ try {
+ readSelector.select();
+ while (adding) {
+ this.wait(1000);
+ }
+
+ Iterator<SelectionKey> iter =
readSelector.selectedKeys().iterator();
+ while (iter.hasNext()) {
+ key = iter.next();
+ iter.remove();
+ if (key.isValid()) {
+ if (key.isReadable()) {
+ doRead(key);
}
- key = null;
- }
- } catch (InterruptedException e) {
- if (running) { // unexpected -- log it
- LOG.info(getName() + " caught: " +
- StringUtils.stringifyException(e));
}
- } catch (IOException ex) {
- LOG.error("Error in Reader", ex);
+ key = null;
}
+ } catch (InterruptedException e) {
+ if (running) { // unexpected -- log it
+ LOG.info(getName() + " unexpectedly interrupted", e);
+ }
+ } catch (IOException ex) {
+ LOG.error("Error in Reader", ex);
}
}
}
@@ -614,7 +624,7 @@ public abstract class Server {
// Sends responses of RPC back to clients.
private class Responder extends Thread {
- private Selector writeSelector;
+ private final Selector writeSelector;
private int pending; // connections waiting to register
final static int PURGE_INTERVAL = 900000; // 15mins
@@ -630,6 +640,19 @@ public abstract class Server {
public void run() {
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
+ try {
+ doRunLoop();
+ } finally {
+ LOG.info("Stopping " + this.getName());
+ try {
+ writeSelector.close();
+ } catch (IOException ioe) {
+ LOG.error("Couldn't close write selector in " + this.getName(), ioe);
+ }
+ }
+ }
+
+ private void doRunLoop() {
long lastPurgeTime = 0; // last check for old calls.
while (running) {
@@ -691,11 +714,9 @@ public abstract class Server {
LOG.warn("Out of Memory in server select", e);
try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) {
- LOG.warn("Exception in Responder " +
- StringUtils.stringifyException(e));
+ LOG.warn("Exception in Responder", e);
}
}
- LOG.info("Stopping " + this.getName());
}
private void doAsyncWrite(SelectionKey key) throws IOException {
@@ -1460,12 +1481,10 @@ public abstract class Server {
}
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
- LOG.info(getName() + " caught: " +
- StringUtils.stringifyException(e));
+ LOG.info(getName() + " unexpectedly interrupted", e);
}
} catch (Exception e) {
- LOG.info(getName() + " caught: " +
- StringUtils.stringifyException(e));
+ LOG.info(getName() + " caught an exception", e);
}
}
LOG.info(getName() + ": exiting");
Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestIPC.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestIPC.java?rev=1127811&r1=1127810&r2=1127811&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestIPC.java
(original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestIPC.java Thu
May 26 07:48:03 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.net.NetUtils;
import java.util.Random;
import java.io.DataInput;
+import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
@@ -36,6 +37,7 @@ import junit.framework.TestCase;
import static org.mockito.Mockito.*;
import org.apache.hadoop.conf.Configuration;
+import org.junit.Assume;
/** Unit tests for IPC. */
public class TestIPC extends TestCase {
@@ -55,6 +57,9 @@ public class TestIPC extends TestCase {
private static final String ADDRESS = "0.0.0.0";
+ /** Directory where we can count open file descriptors on Linux */
+ private static final File FD_DIR = new File("/proc/self/fd");
+
private static class TestServer extends Server {
private boolean sleep;
@@ -354,6 +359,29 @@ public class TestIPC extends TestCase {
addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
}
+ /**
+ * Check that file descriptors aren't leaked by starting
+ * and stopping IPC servers.
+ */
+ public void testSocketLeak() throws Exception {
+ Assume.assumeTrue(FD_DIR.exists()); // only run on Linux
+
+ long startFds = countOpenFileDescriptors();
+ for (int i = 0; i < 50; i++) {
+ Server server = new TestServer(1, true);
+ server.start();
+ server.stop();
+ }
+ long endFds = countOpenFileDescriptors();
+
+ assertTrue("Leaked " + (endFds - startFds) + " file descriptors",
+ endFds - startFds < 20);
+ }
+
+ private long countOpenFileDescriptors() {
+ return FD_DIR.list().length;
+ }
+
public static void main(String[] args) throws Exception {
//new TestIPC("test").testSerial(5, false, 2, 10, 1000);