Author: shv
Date: Mon Jul 7 12:50:46 2008
New Revision: 674611
URL: http://svn.apache.org/viewvc?rev=674611&view=rev
Log:
HADOOP-3633. Merge -r 674593:674594 from branch 0.18 to branch 0.17.
Modified:
hadoop/core/branches/branch-0.17/CHANGES.txt
hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/dfs/DataNode.java
Modified: hadoop/core/branches/branch-0.17/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.17/CHANGES.txt?rev=674611&r1=674610&r2=674611&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.17/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.17/CHANGES.txt Mon Jul 7 12:50:46 2008
@@ -41,6 +41,9 @@
HADOOP-1979. Speed up fsck by adding a buffered stream. (Lohit
Vijaya Renu via omalley)
+ HADOOP-3633. Correct exception handling in DataXceiveServer, and throttle
+ the number of xceiver threads in a data-node. (shv)
+
Release 0.17.0 - 2008-05-18
INCOMPATIBLE CHANGES
Modified:
hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/dfs/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/dfs/DataNode.java?rev=674611&r1=674610&r2=674611&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/dfs/DataNode.java
(original)
+++
hadoop/core/branches/branch-0.17/src/java/org/apache/hadoop/dfs/DataNode.java
Mon Jul 7 12:50:46 2008
@@ -154,10 +154,6 @@
return System.currentTimeMillis();
}
-
-
-
-
/**
* Create the DataNode given a configuration and an array of dataDirs.
* 'dataDirs' is where the blocks are stored.
@@ -502,8 +498,7 @@
this.threadGroup.interrupt();
LOG.info("Waiting for threadgroup to exit, active threads is " +
this.threadGroup.activeCount());
- if (this.threadGroup.isDestroyed() ||
- this.threadGroup.activeCount() == 0) {
+ if (this.threadGroup.activeCount() == 0) {
break;
}
try {
@@ -573,18 +568,18 @@
shutdown();
}
- private static class Count {
- int value = 0;
- Count(int init) { value = init; }
- synchronized void incr() { value++; }
- synchronized void decr() { value--; }
- @Override
- public String toString() { return Integer.toString(value); }
- public int getValue() { return value; }
+ /**
+ * Maximal number of concurrent xceivers per node.
+ * Enforcing the limit is required in order to avoid data-node
+ * running out of memory.
+ */
+ private final static int MAX_XCEIVER_COUNT = 256;
+
+ /** Number of concurrent xceivers per node. */
+ int getXceiverCount() {
+ return threadGroup == null ? 0 : threadGroup.activeCount();
}
- Count xceiverCount = new Count(0);
-
/**
* Main loop for the DataNode. Runs until shutdown,
* forever calling remote NameNode functions.
@@ -619,7 +614,7 @@
data.getDfsUsed(),
data.getRemaining(),
xmitsInProgress,
-
xceiverCount.getValue());
+ getXceiverCount());
myMetrics.heartbeats.inc(now() - startTime);
//LOG.info("Just sent heartbeat, with name " + localName);
lastHeartbeat = startTime;
@@ -899,15 +894,25 @@
/**
*/
public void run() {
- try {
- while (shouldRun) {
+ while (shouldRun) {
+ try {
Socket s = ss.accept();
s.setTcpNoDelay(true);
new Daemon(threadGroup, new DataXceiver(s)).start();
+ } catch (IOException ie) {
+ LOG.warn(dnRegistration + ":DataXceiveServer: "
+ + StringUtils.stringifyException(ie));
+ } catch (Throwable te) {
+ LOG.error(dnRegistration + ":DataXceiveServer: Exiting due to:"
+ + StringUtils.stringifyException(te));
+ shouldRun = false;
}
+ }
+ try {
ss.close();
} catch (IOException ie) {
- LOG.info(dnRegistration + ":Exiting DataXceiveServer due to " +
ie.toString());
+ LOG.warn(dnRegistration + ":DataXceiveServer: "
+ + StringUtils.stringifyException(ie));
}
}
public void kill() {
@@ -915,14 +920,16 @@
"shoudRun should be set to false before killing";
try {
this.ss.close();
- } catch (IOException iex) {
+ } catch (IOException ie) {
+ LOG.warn(dnRegistration + ":DataXceiveServer.kill(): "
+ + StringUtils.stringifyException(ie));
}
// close all the sockets that were accepted earlier
synchronized (childSockets) {
- for (Iterator it = childSockets.values().iterator();
+ for (Iterator<Socket> it = childSockets.values().iterator();
it.hasNext();) {
- Socket thissock = (Socket) it.next();
+ Socket thissock = it.next();
try {
thissock.close();
} catch (IOException e) {
@@ -945,7 +952,7 @@
InetSocketAddress isock = (InetSocketAddress)s.getRemoteSocketAddress();
remoteAddress = isock.toString();
localAddress = s.getInetAddress() + ":" + s.getLocalPort();
- LOG.debug("Number of active connections is: "+xceiverCount);
+ LOG.debug("Number of active connections is: " + getXceiverCount());
}
/**
@@ -962,6 +969,13 @@
}
boolean local = s.getInetAddress().equals(s.getLocalAddress());
byte op = in.readByte();
+ // Make sure the xciver count is not exceeded
+ int curXceiverCount = getXceiverCount();
+ if(curXceiverCount > MAX_XCEIVER_COUNT) {
+ throw new IOException("xceiverCount " + curXceiverCount
+ + " exceeds the limit of concurrent xcievers "
+ + MAX_XCEIVER_COUNT);
+ }
long startTime = now();
switch ( op ) {
case OP_READ_BLOCK:
@@ -998,7 +1012,8 @@
} catch (Throwable t) {
LOG.error(dnRegistration + ":DataXceiver: " +
StringUtils.stringifyException(t));
} finally {
- LOG.debug(dnRegistration + ":Number of active connections is:
"+xceiverCount);
+ LOG.debug(dnRegistration + ":Number of active connections is: "
+ + getXceiverCount());
IOUtils.closeStream(in);
IOUtils.closeSocket(s);
childSockets.remove(s);
@@ -1011,7 +1026,6 @@
* @throws IOException
*/
private void readBlock(DataInputStream in) throws IOException {
- xceiverCount.incr();
//
// Read in the header
//
@@ -1064,7 +1078,6 @@
StringUtils.stringifyException(ioe) );
throw ioe;
} finally {
- xceiverCount.decr();
IOUtils.closeStream(out);
IOUtils.closeStream(blockSender);
}
@@ -1077,7 +1090,6 @@
* @throws IOException
*/
private void writeBlock(DataInputStream in) throws IOException {
- xceiverCount.incr();
LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
" tcp no delay " + s.getTcpNoDelay());
//
@@ -1229,8 +1241,6 @@
IOUtils.closeStream(replyOut);
IOUtils.closeSocket(mirrorSock);
IOUtils.closeStream(blockReceiver);
- // decrement counter
- xceiverCount.decr();
}
}
@@ -1239,8 +1249,6 @@
* @param in
*/
void readMetadata(DataInputStream in) throws IOException {
- xceiverCount.incr();
-
Block block = new Block( in.readLong(), 0 );
MetaDataInputStream checksumIn = null;
DataOutputStream out = null;
@@ -1269,7 +1277,6 @@
//last DATA_CHUNK
out.writeInt(0);
} finally {
- xceiverCount.decr();
IOUtils.closeStream(out);
IOUtils.closeStream(checksumIn);
}
@@ -2683,6 +2690,7 @@
}
LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
+ shutdown();
}
/** Start a single datanode daemon and wait for it to finish.