Author: hairong
Date: Mon Feb 2 19:03:12 2009
New Revision: 740077
URL: http://svn.apache.org/viewvc?rev=740077&view=rev
Log:
HADOOP-5034. NameNode should send both replication and deletion requests to
DataNode in one reply to a heartbeat. Contributed by Hairong Kuang.
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=740077&r1=740076&r2=740077&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Feb 2 19:03:12 2009
@@ -741,6 +741,9 @@
HADOOP-4862. Minor : HADOOP-3678 did not remove all the cases of
spurious IOExceptions logged by DataNode. (Raghu Angadi)
+ HADOOP-5034. NameNode should send both replication and deletion requests
+ to DataNode in one reply to a heartbeat. (hairong)
+
Release 0.19.0 - 2008-11-18
INCOMPATIBLE CHANGES
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=740077&r1=740076&r2=740077&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Mon Feb 2 19:03:12 2009
@@ -694,7 +694,7 @@
// -- Bytes remaining
//
lastHeartbeat = startTime;
- DatanodeCommand cmd = namenode.sendHeartbeat(dnRegistration,
+ DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
data.getCapacity(),
data.getDfsUsed(),
data.getRemaining(),
@@ -702,7 +702,7 @@
getXceiverCount());
myMetrics.heartbeats.inc(now() - startTime);
//LOG.info("Just sent heartbeat, with name " + localName);
- if (!processCommand(cmd))
+ if (!processCommand(cmds))
continue;
}
@@ -812,6 +812,27 @@
} // while (shouldRun)
} // offerService
+ /**
+ * Process an array of datanode commands
+ *
+ * @param cmds an array of datanode commands
+ * @return true if further processing may be required or false otherwise.
+ */
+ private boolean processCommand(DatanodeCommand[] cmds) {
+ if (cmds != null) {
+ for (DatanodeCommand cmd : cmds) {
+ try {
+ if (processCommand(cmd) == false) {
+ return false;
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Error processing datanode Command", ioe);
+ }
+ }
+ }
+ return true;
+ }
+
/**
*
* @param cmd
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=740077&r1=740076&r2=740077&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Mon Feb 2 19:03:12 2009
@@ -2132,10 +2132,10 @@
* If a substantial amount of time passed since the last datanode
* heartbeat then request an immediate block report.
*
- * @return a datanode command
+ * @return an array of datanode commands
* @throws IOException
*/
- DatanodeCommand handleHeartbeat(DatanodeRegistration nodeReg,
+ DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
long capacity, long dfsUsed, long remaining,
int xceiverCount, int xmitsInProgress) throws IOException {
DatanodeCommand cmd = null;
@@ -2145,7 +2145,7 @@
try {
nodeinfo = getDatanode(nodeReg);
} catch(UnregisteredDatanodeException e) {
- return DatanodeCommand.REGISTER;
+ return new DatanodeCommand[]{DatanodeCommand.REGISTER};
}
// Check if this datanode should actually be shutdown instead.
@@ -2155,7 +2155,7 @@
}
if (nodeinfo == null || !nodeinfo.isAlive) {
- return DatanodeCommand.REGISTER;
+ return new DatanodeCommand[]{DatanodeCommand.REGISTER};
}
updateStats(nodeinfo, false);
@@ -2163,26 +2163,35 @@
updateStats(nodeinfo, true);
//check lease recovery
- if (cmd == null) {
- cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
+ cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
+ if (cmd != null) {
+ return new DatanodeCommand[] {cmd};
}
+
+ ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(2);
//check pending replication
- if (cmd == null) {
- cmd = nodeinfo.getReplicationCommand(
+ cmd = nodeinfo.getReplicationCommand(
maxReplicationStreams - xmitsInProgress);
+ if (cmd != null) {
+ cmds.add(cmd);
}
//check block invalidation
- if (cmd == null) {
- cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
+ cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
+ if (cmd != null) {
+ cmds.add(cmd);
+ }
+ if (!cmds.isEmpty()) {
+ return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
}
}
//check distributed upgrade
- if (cmd == null) {
- cmd = getDistributedUpgradeCommand();
+ cmd = getDistributedUpgradeCommand();
+ if (cmd != null) {
+ return new DatanodeCommand[] {cmd};
}
- return cmd;
+ return null;
}
private void updateStats(DatanodeDescriptor node, boolean isAdded) {
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=740077&r1=740076&r2=740077&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
Mon Feb 2 19:03:12 2009
@@ -697,10 +697,10 @@
/**
* Data node notify the name node that it is alive
- * Return a block-oriented command for the datanode to execute.
+ * Return an array of block-oriented commands for the datanode to execute.
* This will be either a transfer or a delete operation.
*/
- public DatanodeCommand sendHeartbeat(DatanodeRegistration nodeReg,
+ public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg,
long capacity,
long dfsUsed,
long remaining,
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=740077&r1=740076&r2=740077&view=diff
==============================================================================
---
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
(original)
+++
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
Mon Feb 2 19:03:12 2009
@@ -35,15 +35,10 @@
**********************************************************************/
public interface DatanodeProtocol extends VersionedProtocol {
/**
- * 18: In sendHeartbeat, the capacity parameter reported was sum of
- * the filesystem disk space of all the data directories. This is
- * changed to exclude the reserved capacity defined by
- * dfs.datanode.du.reserved.
- *
- * The new capacity reported is sum of the filesystem disk space of
- * all the data directories minus the reserved capacity.
+ * 19: SendHeartbeat returns an array of DatanodeCommand objects
+ * in stead of a DatanodeCommand object.
*/
- public static final long versionID = 18L;
+ public static final long versionID = 19L;
// error code
final static int NOTIFY = 0;
@@ -77,11 +72,12 @@
/**
* sendHeartbeat() tells the NameNode that the DataNode is still
* alive and well. Includes some status info, too.
- * It also gives the NameNode a chance to return a "DatanodeCommand" object.
+ * It also gives the NameNode a chance to return
+ * an array of "DatanodeCommand" objects.
* A DatanodeCommand tells the DataNode to invalidate local block(s),
* or to copy them to other DataNodes, etc.
*/
- public DatanodeCommand sendHeartbeat(DatanodeRegistration registration,
+ public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
long capacity,
long dfsUsed, long remaining,
int xmitsInProgress,
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=740077&r1=740076&r2=740077&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
Mon Feb 2 19:03:12 2009
@@ -724,10 +724,13 @@
*/
void sendHeartbeat() throws IOException {
// register datanode
- DatanodeCommand cmd = nameNode.sendHeartbeat(
+ DatanodeCommand[] cmds = nameNode.sendHeartbeat(
dnRegistration, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, 0, 0);
- if(cmd != null)
- LOG.debug("sendHeartbeat Name-node reply: " + cmd.getAction());
+ if(cmds != null) {
+ for (DatanodeCommand cmd : cmds ) {
+ LOG.debug("sendHeartbeat Name-node reply: " + cmd.getAction());
+ }
+ }
}
boolean addBlock(Block blk) {
@@ -755,13 +758,18 @@
*/
int replicateBlocks() throws IOException {
// register datanode
- DatanodeCommand cmd = nameNode.sendHeartbeat(
+ DatanodeCommand[] cmds = nameNode.sendHeartbeat(
dnRegistration, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, 0, 0);
- if(cmd == null || cmd.getAction() != DatanodeProtocol.DNA_TRANSFER)
- return 0;
- // Send a copy of a block to another datanode
- BlockCommand bcmd = (BlockCommand)cmd;
- return transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
+ if (cmds != null) {
+ for (DatanodeCommand cmd : cmds) {
+ if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
+ // Send a copy of a block to another datanode
+ BlockCommand bcmd = (BlockCommand)cmd;
+ return transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
+ }
+ }
+ }
+ return 0;
}
/**
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java?rev=740077&view=auto
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java
(added)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java
Mon Feb 2 19:03:12 2009
@@ -0,0 +1,88 @@
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+
+import junit.framework.TestCase;
+
+/**
+ * Test if FSNamesystem handles heartbeat right
+ */
+public class TestHeartbeatHandling extends TestCase {
+ /**
+ * Test if {...@link FSNamesystem#handleHeartbeat(DatanodeRegistration,
long, long, long, int, int)}
+ * can pick up replication and/or invalidate requests and
+ * observes the max limit
+ */
+ public void testHeartbeat() throws Exception {
+ final Configuration conf = new Configuration();
+ final MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+ try {
+ cluster.waitActive();
+ final FSNamesystem namesystem = cluster.getNamesystem();
+ final DatanodeRegistration nodeReg =
cluster.getDataNodes().get(0).dnRegistration;
+ DatanodeDescriptor dd = namesystem.getDatanode(nodeReg);
+
+ final int REMAINING_BLOCKS = 1;
+ final int MAX_REPLICATE_LIMIT = conf.getInt("dfs.max-repl-streams", 2);
+ final int MAX_INVALIDATE_LIMIT = FSNamesystem.BLOCK_INVALIDATE_CHUNK;
+ final int MAX_INVALIDATE_BLOCKS =
2*MAX_INVALIDATE_LIMIT+REMAINING_BLOCKS;
+ final int MAX_REPLICATE_BLOCKS = 2*MAX_REPLICATE_LIMIT+REMAINING_BLOCKS;
+ final DatanodeDescriptor[] ONE_TARGET = new DatanodeDescriptor[1];
+
+ synchronized (namesystem.heartbeats) {
+ for (int i=0; i<MAX_REPLICATE_BLOCKS; i++) {
+ dd.addBlockToBeReplicated(
+ new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP), ONE_TARGET);
+ }
+ DatanodeCommand[] cmds = namesystem.handleHeartbeat(
+ nodeReg, dd.getCapacity(), dd.getDfsUsed(), dd.getRemaining(), 0, 0);
+ assertEquals(1, cmds.length);
+ assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
+ assertEquals(MAX_REPLICATE_LIMIT,
((BlockCommand)cmds[0]).getBlocks().length);
+
+ ArrayList<Block> blockList = new ArrayList<Block>(MAX_INVALIDATE_BLOCKS);
+ for (int i=0; i<MAX_INVALIDATE_BLOCKS; i++) {
+ blockList.add(new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP));
+ }
+ dd.addBlocksToBeInvalidated(blockList);
+
+ cmds = namesystem.handleHeartbeat(
+ nodeReg, dd.getCapacity(), dd.getDfsUsed(), dd.getRemaining(), 0, 0);
+ assertEquals(2, cmds.length);
+ assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
+ assertEquals(MAX_REPLICATE_LIMIT,
((BlockCommand)cmds[0]).getBlocks().length);
+ assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[1].getAction());
+ assertEquals(MAX_INVALIDATE_LIMIT,
((BlockCommand)cmds[1]).getBlocks().length);
+
+ cmds = namesystem.handleHeartbeat(
+ nodeReg, dd.getCapacity(), dd.getDfsUsed(), dd.getRemaining(), 0, 0);
+ assertEquals(2, cmds.length);
+ assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
+ assertEquals(REMAINING_BLOCKS,
((BlockCommand)cmds[0]).getBlocks().length);
+ assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[1].getAction());
+ assertEquals(MAX_INVALIDATE_LIMIT,
((BlockCommand)cmds[1]).getBlocks().length);
+
+ cmds = namesystem.handleHeartbeat(
+ nodeReg, dd.getCapacity(), dd.getDfsUsed(), dd.getRemaining(), 0, 0);
+ assertEquals(1, cmds.length);
+ assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[0].getAction());
+ assertEquals(REMAINING_BLOCKS,
((BlockCommand)cmds[0]).getBlocks().length);
+
+ cmds = namesystem.handleHeartbeat(
+ nodeReg, dd.getCapacity(), dd.getDfsUsed(), dd.getRemaining(), 0, 0);
+ assertEquals(null, cmds);
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+}