Author: dhruba Date: Thu Oct 4 16:59:00 2007 New Revision: 582029 URL: http://svn.apache.org/viewvc?rev=582029&view=rev Log: HADOOP-1955. The Namenode tries to not pick the same source Datanode for a replication request if the earlier replication request for the same block and that source Datanode had failed. (Raghu Angadi via dhruba) This corresponds to changelist 582028 on trunk.
Modified: lucene/hadoop/branches/branch-0.14/CHANGES.txt lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java Modified: lucene/hadoop/branches/branch-0.14/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/CHANGES.txt?rev=582029&r1=582028&r2=582029&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/CHANGES.txt (original) +++ lucene/hadoop/branches/branch-0.14/CHANGES.txt Thu Oct 4 16:59:00 2007 @@ -26,6 +26,11 @@ HADOOP-1978. Name-node removes edits.new after a successful startup. (Konstantin Shvachko via dhruba) + HADOOP-1955. The Namenode tries to not pick the same source Datanode for + a replication request if the earlier replication request for the same + block and that source Datanode had failed. + (Raghu Angadi via dhruba) + Release 0.14.1 - 2007-09-04 BUG FIXES Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=582029&r1=582028&r2=582029&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java (original) +++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java Thu Oct 4 16:59:00 2007 @@ -442,7 +442,9 @@ private HashMap<Block,FSVolume> volumeMap = null; private HashMap<Block,File> blockMap = null; static Random random = new Random(); - + + long blockWriteTimeout = 3600 * 1000; + /** * An FSDataset has a directory where it loads its data files. */ @@ -457,6 +459,8 @@ volumes.getVolumeMap(volumeMap); blockMap = new HashMap<Block,File>(); volumes.getBlockMap(blockMap); + blockWriteTimeout = Math.max( + conf.getInt("dfs.datanode.block.write.timeout.sec", 3600), 1) * 1000; } /** @@ -526,8 +530,9 @@ // if (ongoingCreates.containsKey(b)) { // check how old is the temp file - wait 1 hour - File tmp = (File)ongoingCreates.get(b); - if ((System.currentTimeMillis() - tmp.lastModified()) < 3600 * 1000) { + File tmp = ongoingCreates.get(b); + if ((System.currentTimeMillis() - tmp.lastModified()) < + blockWriteTimeout) { throw new IOException("Block " + b + " has already been started (though not completed), and thus cannot be created."); } else { Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=582029&r1=582028&r2=582029&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Oct 4 16:59:00 2007 @@ -183,7 +183,7 @@ private long replicationRecheckInterval; //decommissionRecheckInterval is how often namenode checks if a node has finished decommission private long decommissionRecheckInterval; - static int replIndex = 0; // last datanode used for replication work + private int replIndex = 0; // last datanode used for replication work static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration public static FSNamesystem fsNamesystemObject; @@ -217,7 +217,9 @@ this.dir.loadFSImage(getNamespaceDirs(conf), startOpt); this.safeMode = new SafeModeInfo(conf); setBlockTotal(); - pendingReplications = new PendingReplicationBlocks(LOG); + pendingReplications = new PendingReplicationBlocks(LOG, + conf.getInt("dfs.replication.pending.timeout.sec", + -1) * 1000); this.hbthread = new Daemon(new HeartbeatMonitor()); this.lmthread = new Daemon(new LeaseMonitor()); this.replthread = new Daemon(new ReplicationMonitor()); @@ -1886,6 +1888,7 @@ int numiter = 0; int foundwork = 0; int hsize = 0; + int lastReplIndex = -1; while (true) { DatanodeDescriptor node = null; @@ -1897,6 +1900,11 @@ synchronized (heartbeats) { hsize = heartbeats.size(); if (numiter++ >= hsize) { + // no change in replIndex. + if (lastReplIndex >= 0) { + //next time, start after where the last replication was scheduled + replIndex = lastReplIndex; + } break; } if (replIndex >= hsize) { @@ -1922,6 +1930,7 @@ doReplication = true; addBlocksToBeReplicated(node, (Block[])replsets[0], (DatanodeDescriptor[][])replsets[1]); + lastReplIndex = replIndex; } } if (!doReplication) { Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java?rev=582029&r1=582028&r2=582029&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java (original) +++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java Thu Oct 4 16:59:00 2007 @@ -49,12 +49,14 @@ private long defaultRecheckInterval = 5 * 60 * 1000; PendingReplicationBlocks(long timeoutPeriod) { - this.timeout = timeoutPeriod; - init(); + this(null, timeoutPeriod); } - PendingReplicationBlocks(Log log) { + PendingReplicationBlocks(Log log, long timeoutPeriod) { this.LOG = log; + if ( timeoutPeriod > 0 ) { + this.timeout = timeoutPeriod; + } init(); } Modified: lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=582029&r1=582028&r2=582029&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original) +++ lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Thu Oct 4 16:59:00 2007 @@ -113,7 +113,9 @@ conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+ new File(base_dir, "name2").getPath()); } - conf.setInt("dfs.replication", Math.min(3, numDataNodes)); + + int replication = conf.getInt("dfs.replication", 3); + conf.setInt("dfs.replication", Math.min(replication, numDataNodes)); conf.setInt("dfs.safemode.extension", 0); conf.setInt("dfs.namenode.decommission.interval", 3 * 1000); // 3 second Modified: lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java?rev=582029&r1=582028&r2=582029&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java (original) +++ lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java Thu Oct 4 16:59:00 2007 @@ -19,6 +19,7 @@ import junit.framework.TestCase; import java.io.*; +import java.util.Iterator; import java.util.Random; import java.net.*; @@ -166,4 +167,152 @@ cluster.shutdown(); } } + + // Waits for all of the blocks to have expected replication + private void waitForBlockReplication(String filename, + ClientProtocol namenode, + int expected, long maxWaitSec) + throws IOException { + long start = System.currentTimeMillis(); + + //wait for all the blocks to be replicated; + System.out.println("Checking for block replication for " + filename); + int iters = 0; + while (true) { + boolean replOk = true; + LocatedBlocks blocks = namenode.getBlockLocations(filename, 0, + Long.MAX_VALUE); + + for (Iterator<LocatedBlock> iter = blocks.getLocatedBlocks().iterator(); + iter.hasNext();) { + LocatedBlock block = iter.next(); + int actual = block.getLocations().length; + if ( actual < expected ) { + if (true || iters > 0) { + System.out.println("Not enough replicas for " + block.getBlock() + + " yet. Expecting " + expected + ", got " + + actual + "."); + } + replOk = false; + break; + } + } + + if (replOk) { + return; + } + + iters++; + + if (maxWaitSec > 0 && + (System.currentTimeMillis() - start) > (maxWaitSec * 1000)) { + throw new IOException("Timedout while waiting for all blocks to " + + " be replicated for " + filename); + } + + try { + Thread.sleep(500); + } catch (InterruptedException ignored) {} + } + } + + /* This test makes sure that NameNode retries all the available blocks + * for under replicated blocks. + * + * It creates a file with one block and replication of 4. It corrupts + * two of the blocks and removes one of the replicas. Expected behaviour is + * that missing replica will be copied from one valid source. + */ + public void testPendingReplicationRetry() throws IOException { + + MiniDFSCluster cluster = null; + int numDataNodes = 4; + String testFile = "/replication-test-file"; + Path testPath = new Path(testFile); + + byte buffer[] = new byte[1024]; + for (int i=0; i<buffer.length; i++) { + buffer[i] = '1'; + } + + try { + Configuration conf = new Configuration(); + conf.set("dfs.replication", Integer.toString(numDataNodes)); + //first time format + cluster = new MiniDFSCluster(0, conf, numDataNodes, true, + true, null, null); + cluster.waitActive(); + DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", + cluster.getNameNodePort()), + conf); + + OutputStream out = cluster.getFileSystem().create(testPath); + out.write(buffer); + out.close(); + + waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1); + + // get first block of the file. + String block = dfsClient.namenode. + getBlockLocations(testFile, 0, Long.MAX_VALUE). + get(0).getBlock().toString(); + + cluster.shutdown(); + cluster = null; + + //Now mess up some of the replicas. + //Delete the first and corrupt the next two. + File baseDir = new File(System.getProperty("test.build.data"), + "dfs/data"); + for (int i=0; i<25; i++) { + buffer[i] = '0'; + } + + int fileCount = 0; + for (int i=0; i<6; i++) { + File blockFile = new File(baseDir, "data" + (i+1) + "/current/" + block); + System.out.println("Checking for file " + blockFile); + + if (blockFile.exists()) { + if (fileCount == 0) { + assertTrue(blockFile.delete()); + } else { + // corrupt it. + long len = blockFile.length(); + assertTrue(len > 50); + RandomAccessFile blockOut = new RandomAccessFile(blockFile, "rw"); + blockOut.seek(len/3); + blockOut.write(buffer, 0, 25); + } + fileCount++; + } + } + assertEquals(3, fileCount); + + /* Start the MiniDFSCluster with more datanodes since once a writeBlock + * to a datanode node fails, same block can not be written to it + * immediately. In our case some replication attempts will fail. + */ + conf = new Configuration(); + conf.set("dfs.replication", Integer.toString(numDataNodes)); + conf.set("dfs.replication.pending.timeout.sec", Integer.toString(2)); + conf.set("dfs.datanode.block.write.timeout.sec", Integer.toString(5)); + conf.set("dfs.safemode.threshold.pct", "0.75f"); // only 3 copies exist + + cluster = new MiniDFSCluster(0, conf, numDataNodes*2, false, + true, null, null); + cluster.waitActive(); + + dfsClient = new DFSClient(new InetSocketAddress("localhost", + cluster.getNameNodePort()), + conf); + + waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1); + + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } }