Author: rangadi
Date: Tue Oct 28 21:47:51 2008
New Revision: 708773
URL: http://svn.apache.org/viewvc?rev=708773&view=rev
Log:
HADOOP-4346. Implement blocking connect so that Hadoop is not affected
by selector problem with JDK default implementation. (Raghu Angadi)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java
hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java
hadoop/core/trunk/src/core/org/apache/hadoop/net/SocketIOWithTimeout.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=708773&r1=708772&r2=708773&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Oct 28 21:47:51 2008
@@ -80,6 +80,9 @@
HADOOP-4482. Make the JMX monitoring use predictable names for the
datanodes to enable Nagios monitoring. (Brian Bockelman via omalley)
+ HADOOP-4346. Implement blocking connect so that Hadoop is not affected
+ selector problem with JDK default implementation. (Raghu Angadi)
+
Release 0.19.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java?rev=708773&r1=708772&r2=708773&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java Tue Oct 28
21:47:51 2008
@@ -296,7 +296,7 @@
this.socket = socketFactory.createSocket();
this.socket.setTcpNoDelay(tcpNoDelay);
// connection time out is 20s
- this.socket.connect(remoteId.getAddress(), 20000);
+ NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
this.socket.setSoTimeout(pingInterval);
break;
} catch (SocketTimeoutException toe) {
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java?rev=708773&r1=708772&r2=708773&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java Tue Oct 28
21:47:51 2008
@@ -23,8 +23,10 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.SocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
import java.util.Map.Entry;
import java.util.*;
@@ -369,6 +371,40 @@
socket.getOutputStream() : new SocketOutputStream(socket,
timeout);
}
+ /**
+ * This is a drop-in replacement for
+ * [EMAIL PROTECTED] Socket#connect(SocketAddress, int)}.
+ * In the case of normal sockets that don't have associated channels, this
+ * just invokes <code>socket.connect(endpoint, timeout)</code>. If
+ * <code>socket.getChannel()</code> returns a non-null channel,
+ * connect is implemented using Hadoop's selectors. This is done mainly
+ * to avoid Sun's connect implementation from creating thread-local
+ * selectors, since Hadoop does not have control on when these are closed
+ * and could end up taking all the available file descriptors.
+ *
+ * @see java.net.Socket#connect(java.net.SocketAddress, int)
+ *
+ * @param socket
+ * @param endpoint
+ * @param timeout - timeout in milliseconds
+ */
+ public static void connect(Socket socket,
+ SocketAddress endpoint,
+ int timeout) throws IOException {
+ if (socket == null || endpoint == null || timeout < 0) {
+ throw new IllegalArgumentException("Illegal argument for connect()");
+ }
+
+ SocketChannel ch = socket.getChannel();
+
+ if (ch == null) {
+ // let the default implementation handle it.
+ socket.connect(endpoint, timeout);
+ } else {
+ SocketIOWithTimeout.connect(ch, endpoint, timeout);
+ }
+ }
+
/**
* Given a string representation of a host, return its ip address
* in textual presentation.
Modified:
hadoop/core/trunk/src/core/org/apache/hadoop/net/SocketIOWithTimeout.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/net/SocketIOWithTimeout.java?rev=708773&r1=708772&r2=708773&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/net/SocketIOWithTimeout.java
(original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/net/SocketIOWithTimeout.java
Tue Oct 28 21:47:51 2008
@@ -20,11 +20,13 @@
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.LinkedList;
@@ -159,7 +161,8 @@
}
if (count == 0) {
- throw new SocketTimeoutException(timeoutExceptionString(ops));
+ throw new SocketTimeoutException(timeoutExceptionString(channel,
+ timeout, ops));
}
// otherwise the socket should be ready for io.
}
@@ -168,6 +171,64 @@
}
/**
+ * The contract is similar to [EMAIL PROTECTED]
SocketChannel#connect(SocketAddress)}
+ * with a timeout.
+ *
+ * @see SocketChannel#connect(SocketAddress)
+ *
+ * @param channel - this should be a [EMAIL PROTECTED] SelectableChannel}
+ * @param endpoint
+ * @throws IOException
+ */
+ static void connect(SocketChannel channel,
+ SocketAddress endpoint, int timeout) throws IOException {
+
+ boolean blockingOn = channel.isBlocking();
+ if (blockingOn) {
+ channel.configureBlocking(false);
+ }
+
+ try {
+ if (channel.connect(endpoint)) {
+ return;
+ }
+
+ long timeoutLeft = timeout;
+ long endTime = (timeout > 0) ? (System.currentTimeMillis() + timeout): 0;
+
+ while (true) {
+ // we might have to call finishConnect() more than once
+ // for some channels (with user level protocols)
+
+ int ret = selector.select((SelectableChannel)channel,
+ SelectionKey.OP_CONNECT, timeoutLeft);
+
+ if (ret > 0 && channel.finishConnect()) {
+ return;
+ }
+
+ if (ret == 0 ||
+ (timeout > 0 &&
+ (timeoutLeft = (endTime - System.currentTimeMillis())) <= 0)) {
+ throw new SocketTimeoutException(
+ timeoutExceptionString(channel, timeout,
+ SelectionKey.OP_CONNECT));
+ }
+ }
+ } catch (IOException e) {
+ // javadoc for SocketChannel.connect() says channel should be closed.
+ try {
+ channel.close();
+ } catch (IOException ignored) {}
+ throw e;
+ } finally {
+ if (blockingOn && channel.isOpen()) {
+ channel.configureBlocking(true);
+ }
+ }
+ }
+
+ /**
* This is similar to [EMAIL PROTECTED] #doIO(ByteBuffer, int)} except that
it
* does not perform any I/O. It just waits for the channel to be ready
* for I/O as specified in ops.
@@ -182,17 +243,28 @@
void waitForIO(int ops) throws IOException {
if (selector.select(channel, ops, timeout) == 0) {
- throw new SocketTimeoutException(timeoutExceptionString(ops));
+ throw new SocketTimeoutException(timeoutExceptionString(channel, timeout,
+ ops));
}
}
-
- private String timeoutExceptionString(int ops) {
- String waitingFor = "" + ops;
- if (ops == SelectionKey.OP_READ) {
- waitingFor = "read";
- } else if (ops == SelectionKey.OP_WRITE) {
- waitingFor = "write";
+ private static String timeoutExceptionString(SelectableChannel channel,
+ long timeout, int ops) {
+
+ String waitingFor;
+ switch(ops) {
+
+ case SelectionKey.OP_READ :
+ waitingFor = "read"; break;
+
+ case SelectionKey.OP_WRITE :
+ waitingFor = "write"; break;
+
+ case SelectionKey.OP_CONNECT :
+ waitingFor = "connect"; break;
+
+ default :
+ waitingFor = "" + ops;
}
return timeout + " millis timeout while " +
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=708773&r1=708772&r2=708773&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Tue Oct 28
21:47:51 2008
@@ -619,7 +619,9 @@
for(int j = 0; !done && j < datanodes.length; j++) {
//connect to a datanode
final Socket sock = socketFactory.createSocket();
- sock.connect(NetUtils.createSocketAddr(datanodes[j].getName()),
timeout);
+ NetUtils.connect(sock,
+ NetUtils.createSocketAddr(datanodes[j].getName()),
+ timeout);
sock.setSoTimeout(timeout);
DataOutputStream out = new DataOutputStream(
@@ -1531,7 +1533,7 @@
try {
s = socketFactory.createSocket();
- s.connect(targetAddr, socketTimeout);
+ NetUtils.connect(s, targetAddr, socketTimeout);
s.setSoTimeout(socketTimeout);
Block blk = targetBlock.getBlock();
@@ -1733,7 +1735,7 @@
try {
dn = socketFactory.createSocket();
- dn.connect(targetAddr, socketTimeout);
+ NetUtils.connect(dn, targetAddr, socketTimeout);
dn.setSoTimeout(socketTimeout);
int len = (int) (end - start + 1);
@@ -2737,7 +2739,7 @@
InetSocketAddress target =
NetUtils.createSocketAddr(nodes[0].getName());
s = socketFactory.createSocket();
int timeoutValue = 3000 * nodes.length + socketTimeout;
- s.connect(target, timeoutValue);
+ NetUtils.connect(s, target, timeoutValue);
s.setSoTimeout(timeoutValue);
s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
LOG.debug("Send buf size " + s.getSendBufferSize());
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=708773&r1=708772&r2=708773&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Tue Oct 28 21:47:51 2008
@@ -1033,7 +1033,7 @@
InetSocketAddress curTarget =
NetUtils.createSocketAddr(targets[0].getName());
sock = newSocket();
- sock.connect(curTarget, socketTimeout);
+ NetUtils.connect(sock, curTarget, socketTimeout);
sock.setSoTimeout(targets.length * socketTimeout);
long writeTimeout = socketWriteTimeout +
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=708773&r1=708772&r2=708773&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
Tue Oct 28 21:47:51 2008
@@ -278,7 +278,7 @@
int timeoutValue = numTargets * datanode.socketTimeout;
int writeTimeout = datanode.socketWriteTimeout +
(HdfsConstants.WRITE_TIMEOUT_EXTENSION *
numTargets);
- mirrorSock.connect(mirrorTarget, timeoutValue);
+ NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
mirrorOut = new DataOutputStream(
@@ -557,7 +557,7 @@
InetSocketAddress proxyAddr = NetUtils.createSocketAddr(
proxySource.getName());
proxySock = datanode.newSocket();
- proxySock.connect(proxyAddr, datanode.socketTimeout);
+ NetUtils.connect(proxySock, proxyAddr, datanode.socketTimeout);
proxySock.setSoTimeout(datanode.socketTimeout);
OutputStream baseStream = NetUtils.getOutputStream(proxySock,