Author: szetszwo
Date: Mon Jan 7 03:12:43 2013
New Revision: 1429655
URL: http://svn.apache.org/viewvc?rev=1429655&view=rev
Log:
HDFS-4351. In BlockPlacementPolicyDefault.chooseTarget(..), numOfReplicas
needs to be updated when avoiding stale nodes. Contributed by Andrew Wang
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1429655&r1=1429654&r2=1429655&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Mon Jan 7 03:12:43 2013
@@ -404,6 +404,9 @@ Release 1.2.0 - unreleased
MAPREDUCE-4915. TestShuffleExceptionCount fails with open JDK7.
(Brandon Li via suresh)
+ HDFS-4351. In BlockPlacementPolicyDefault.chooseTarget(..), numOfReplicas
+ needs to be updated when avoiding stale nodes. (Andrew Wang via szetszwo)
+
Release 1.1.2 - Unreleased
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=1429655&r1=1429654&r2=1429655&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
(original)
+++
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
Mon Jan 7 03:12:43 2013
@@ -161,6 +161,7 @@ public class BlockPlacementPolicyDefault
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return writer;
}
+ int totalReplicasExpected = numOfReplicas + results.size();
int numOfResults = results.size();
boolean newBlock = (numOfResults==0);
@@ -206,15 +207,22 @@ public class BlockPlacementPolicyDefault
maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
FSNamesystem.LOG.warn("Not able to place enough replicas, still in need
of "
- + numOfReplicas);
+ + (totalReplicasExpected - results.size()) + " to reach "
+ + totalReplicasExpected + "\n"
+ + e.getMessage());
if (avoidStaleNodes) {
- // excludedNodes now has - initial excludedNodes, any nodes that were
- // chosen and nodes that were tried but were not chosen because they
- // were stale, decommissioned or for any other reason a node is not
- // chosen for write. Retry again now not avoiding stale node
+ // Retry chooseTarget again, this time not avoiding stale nodes.
+
+ // excludedNodes contains the initial excludedNodes and nodes that were
+ // not chosen because they were stale, decommissioned, etc.
+ // We need to additionally exclude the nodes that were added to the
+ // result list in the successful calls to choose*() above.
for (Node node : results) {
oldExcludedNodes.put(node, node);
}
+ // Set numOfReplicas, since it can get out of sync with the result list
+ // if the NotEnoughReplicasException was thrown in chooseRandom().
+ numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false);
}
Modified:
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java?rev=1429655&r1=1429654&r2=1429655&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
(original)
+++
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
Mon Jan 7 03:12:43 2013
@@ -38,6 +38,10 @@ import org.apache.hadoop.hdfs.protocol.F
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
public class TestReplicationPolicy extends TestCase {
private static final int BLOCK_SIZE = 1024;
@@ -341,6 +345,84 @@ public class TestReplicationPolicy exten
return false;
}
+ /**
+ * In this testcase, it tries to choose more targets than available nodes and
+ * check the result, with stale node avoidance on the write path enabled.
+ * @throws Exception
+ */
+ public void testChooseTargetWithMoreThanAvailableNodesWithStaleness()
+ throws Exception {
+ try {
+ namenode.getNamesystem().setAvoidStaleDataNodesForWrite(true);
+ testChooseTargetWithMoreThanAvailableNodes();
+ } finally {
+ namenode.getNamesystem().setAvoidStaleDataNodesForWrite(false);
+ }
+ }
+
+ /**
+ * In this testcase, it tries to choose more targets than available nodes and
+ * check the result.
+ * @throws Exception
+ */
+ public void testChooseTargetWithMoreThanAvailableNodes() throws Exception {
+ // make data node 0 & 1 to be not qualified to choose: not enough disk
space
+ for(int i=0; i<2; i++) {
+ dataNodes[i].updateHeartbeat(
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0);
+ }
+
+ final TestAppender appender = new TestAppender();
+ final Logger logger = Logger.getRootLogger();
+ logger.addAppender(appender);
+
+ // try to choose NUM_OF_DATANODES which is more than actually available
+ // nodes.
+ DatanodeDescriptor[] targets = replicator.chooseTarget(filename,
+ NUM_OF_DATANODES, dataNodes[0], new ArrayList<DatanodeDescriptor>(),
+ BLOCK_SIZE);
+ assertEquals(targets.length, NUM_OF_DATANODES - 2);
+
+ final List<LoggingEvent> log = appender.getLog();
+ assertNotNull(log);
+ assertFalse(log.size() == 0);
+ final LoggingEvent lastLogEntry = log.get(log.size() - 1);
+
+ assertEquals(lastLogEntry.getLevel(), Level.WARN);
+ // Suppose to place replicas on each node but two data nodes are not
+ // available for placing replica, so here we expect a short of 2
+ assertTrue(((String)lastLogEntry.getMessage()).contains("in need of 2"));
+
+ for(int i=0; i<2; i++) {
+ dataNodes[i].updateHeartbeat(
+ 2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
+ }
+ }
+
+ class TestAppender extends AppenderSkeleton {
+ private final List<LoggingEvent> log = new ArrayList<LoggingEvent>();
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+
+ @Override
+ protected void append(final LoggingEvent loggingEvent) {
+ log.add(loggingEvent);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public List<LoggingEvent> getLog() {
+ return new ArrayList<LoggingEvent>(log);
+ }
+ }
+
public void testChooseTargetWithStaleNodes() throws Exception {
// Enable avoiding writing to stale DataNodes
namenode.getNamesystem().setAvoidStaleDataNodesForWrite(true);