Author: dhruba
Date: Fri Jun 6 10:18:01 2008
New Revision: 664041
URL: http://svn.apache.org/viewvc?rev=664041&view=rev
Log:
HADOOP-3503. Fix a race condition when client and namenode start simultaneous
recovery of the same block. (dhruba & Tsz Wo (Nicholas), SZE)
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestLeaseRecovery2.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=664041&r1=664040&r2=664041&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jun 6 10:18:01 2008
@@ -495,6 +495,9 @@
HADOOP-3135. Get the system directory from the JobTracker instead of from
the conf. (Subramaniam Krishnan via ddas)
+ HADOOP-3503. Fix a race condition when client and namenode start
simultaneous
+ recovery of the same block. (dhruba & Tsz Wo (Nicholas), SZE)
+
Release 0.17.0 - 2008-05-18
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java?rev=664041&r1=664040&r2=664041&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java Fri Jun 6
10:18:01 2008
@@ -33,7 +33,7 @@
private INodeFile inode;
/**
- * This array contains trpilets of references.
+ * This array contains triplets of references.
* For each i-th data-node the block belongs to
* triplets[3*i] is the reference to the DatanodeDescriptor
* and triplets[3*i+1] and triplets[3*i+2] are references
@@ -138,23 +138,6 @@
return 0;
}
- /** Update this object */
- void update(long newgenerationstamp, long newlength,
- DatanodeDescriptor[] newtargets) {
- //remove all nodes
- for(int n = numNodes(); n >= 0; ) {
- removeNode(--n);
- }
-
- //add all targets
- for(DatanodeDescriptor d : newtargets) {
- addNode(d);
- }
-
- generationStamp = newgenerationstamp;
- len = newlength;
- }
-
/**
* Add data-node this block belongs to.
*/
@@ -339,10 +322,10 @@
/**
* Remove INode reference from block b.
- * Remove the block from the block map
- * only if it does not belong to any file and data-nodes.
+ * If it does not belong to any file and data-nodes,
+ * then remove the block from the block map.
*/
- public void removeINode(Block b) {
+ void removeINode(Block b) {
BlockInfo info = map.get(b);
if (info != null) {
info.inode = null;
@@ -352,6 +335,21 @@
}
}
+ /**
+ * Remove the block from the block map.
+ * If the mapped BlockInfo is not null,
+ * it also removes the datanodes associated with the BlockInfo.
+ */
+ void remove(Block b) {
+ BlockInfo info = map.remove(b);
+ if (info != null) {
+ info.inode = null;
+ for(int n = info.numNodes(); n >= 0; ) {
+ info.removeNode(--n);
+ }
+ }
+ }
+
/** Returns the block object it it exists in the map. */
BlockInfo getStoredBlock(Block b) {
return map.get(b);
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=664041&r1=664040&r2=664041&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Fri Jun 6
10:18:01 2008
@@ -3067,9 +3067,7 @@
/** [EMAIL PROTECTED] */
public void updateBlock(Block oldblock, Block newblock, boolean finalize)
throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("oldblock=" + oldblock + ", newblock=" + newblock);
- }
+ LOG.info("oldblock=" + oldblock + ", newblock=" + newblock);
data.updateBlock(oldblock, newblock);
if (finalize) {
data.finalizeBlock(newblock);
@@ -3097,9 +3095,7 @@
/** [EMAIL PROTECTED] */
public Block recoverBlock(Block block, DatanodeInfo[] targets
) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("recoverBlock for block " + block);
- }
+ LOG.info("Client invoking recoverBlock for block " + block);
return LeaseManager.recoverBlock(block, targets, namenode,
getConf(), false);
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?rev=664041&r1=664040&r2=664041&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Fri
Jun 6 10:18:01 2008
@@ -31,7 +31,7 @@
**********************************************************************/
interface DatanodeProtocol extends VersionedProtocol {
/**
- * 15: added DNA_RECOVERBLOCK, nextGenerationStamp and
commitBlockSynchronization
+ * 16: Block parameter added to nextGenerationStamp().
*/
public static final long versionID = 15L;
@@ -135,9 +135,10 @@
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
/**
- * @return the next GenerationStamp
+ * @return the next GenerationStamp to be associated with the specified
+ * block.
*/
- public long nextGenerationStamp() throws IOException;
+ public long nextGenerationStamp(Block block) throws IOException;
/**
* Commit block synchronization in lease recovery
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=664041&r1=664040&r2=664041&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Fri Jun
6 10:18:01 2008
@@ -952,21 +952,17 @@
}
//
// If the original holder has not renewed in the last SOFTLIMIT
- // period, then reclaim all resources and allow this request
- // to proceed. Otherwise, prevent this request from creating file.
+ // period, then start lease recovery.
//
if (lease.expiredSoftLimit()) {
LOG.info("startFile: recover lease " + lease + ", src=" + src);
internalReleaseLease(lease, src);
- leaseManager.renewLease(lease);
- } else {
- throw new AlreadyBeingCreatedException(
- "failed to create file " +
src + " for " + holder +
- " on client " + clientMachine
+
- ", because this file is
already being created by " +
- pendingFile.getClientName() +
- " on " +
pendingFile.getClientMachine());
}
+ throw new AlreadyBeingCreatedException("failed to create file " + src
+ " for " + holder +
+ " on client " + clientMachine +
+ ", because this file is already
being created by " +
+ pendingFile.getClientName() +
+ " on " +
pendingFile.getClientMachine());
}
try {
@@ -1644,9 +1640,7 @@
* @param holder The datanode that was creating the file
*/
void internalReleaseLease(Lease lease, String src) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("lease=" + lease + ", src=" + src);
- }
+ LOG.info("Recovering lease=" + lease + ", src=" + src);
INodeFile iFile = dir.getFileINode(src);
if (iFile == null) {
@@ -1671,7 +1665,8 @@
// Initialize lease recovery for pendingFile. If there are no blocks
// associated with this file, then reap lease immediately. Otherwise
// renew the lease and trigger lease recovery.
- if (pendingFile.getTargets().length == 0) {
+ if (pendingFile.getTargets() == null ||
+ pendingFile.getTargets().length == 0) {
if (pendingFile.getBlocks().length == 0) {
finalizeINodeFileUnderConstruction(src, pendingFile);
NameNode.stateChangeLog.warn("BLOCK*"
@@ -1714,48 +1709,67 @@
long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets
) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("commitBlockSynchronization(lastblock=" + lastblock
+ LOG.info("commitBlockSynchronization(lastblock=" + lastblock
+ ", newgenerationstamp=" + newgenerationstamp
+ ", newlength=" + newlength
+ ", newtargets=" + Arrays.asList(newtargets) + ")");
- }
- BlockInfo blockinfo = blocksMap.getStoredBlock(lastblock);
- if (blockinfo == null) {
+ final BlockInfo oldblockinfo = blocksMap.getStoredBlock(lastblock);
+ if (oldblockinfo == null) {
throw new IOException("Block (=" + lastblock + ") not found");
}
- INodeFile iFile = blockinfo.getINode();
+ INodeFile iFile = oldblockinfo.getINode();
if (!iFile.isUnderConstruction()) {
throw new IOException("Unexpected block (=" + lastblock
+ ") since the file (=" + iFile.getLocalName()
+ ") is not under construction");
}
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
+
+ // Remove old block from blocks map. This always have to be done
+ // because the generationstamp of this block is changing.
+ blocksMap.remove(lastblock);
- //update block info
- if (newtargets.length > 0) {
- DatanodeDescriptor[] descriptors = new
DatanodeDescriptor[newtargets.length];
- for(int i = 0; i < newtargets.length; i++) {
- descriptors[i] = getDatanode(newtargets[i]);
+ if (deleteblock) {
+ pendingFile.removeBlock(lastblock);
+ }
+ else {
+ // update last block, construct newblockinfo and add it to the blocks map
+ lastblock.set(lastblock.blkid, newlength, newgenerationstamp);
+ final BlockInfo newblockinfo = blocksMap.addINode(lastblock,
pendingFile);
+
+ //update block info
+ DatanodeDescriptor[] descriptors = null;
+ if (newtargets.length > 0) {
+ descriptors = new DatanodeDescriptor[newtargets.length];
+ for(int i = 0; i < newtargets.length; i++) {
+ descriptors[i] = getDatanode(newtargets[i]);
+ descriptors[i].addBlock(newblockinfo);
+ }
}
- blockinfo.update(newgenerationstamp, newlength, descriptors);
+
+ pendingFile.setLastBlock(newblockinfo, descriptors);
}
// If this commit does not want to close the file, just persist
// block locations and return
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
String src = leaseManager.findPath(pendingFile);
- if (deleteblock) {
- dir.removeBlock(src, pendingFile, lastblock);
- }
if (!closeFile) {
dir.persistBlocks(src, pendingFile);
getEditLog().logSync();
+ LOG.info("commitBlockSynchronization(lastblock=" + lastblock
+ + ", newgenerationstamp=" + newgenerationstamp
+ + ", newlength=" + newlength
+ + ", newtargets=" + Arrays.asList(newtargets) + ") successful");
return;
}
//remove lease, close file
finalizeINodeFileUnderConstruction(src, pendingFile);
getEditLog().logSync();
+ LOG.info("commitBlockSynchronization(newblock=" + lastblock
+ + ", newgenerationstamp=" + newgenerationstamp
+ + ", newlength=" + newlength
+ + ", newtargets=" + Arrays.asList(newtargets) + ") successful");
}
@@ -4302,6 +4316,25 @@
return gs;
}
+ /**
+ * Verifies that the block is associated with a file that has a lease.
+ * Increments, logs and then returns the stamp
+ */
+ synchronized long nextGenerationStampForBlock(Block block) throws
IOException {
+ String msg = "Block " + block + " is already commited.";
+ BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+ if (storedBlock == null) {
+ LOG.info(msg);
+ throw new IOException(msg);
+ }
+ INode fileINode = storedBlock.getINode();
+ if (!fileINode.isUnderConstruction()) {
+ LOG.info(msg);
+ throw new IOException(msg);
+ }
+ return nextGenerationStamp();
+ }
+
// rename was successful. If any part of the renamed subtree had
// files that were being written to, update with new filename.
//
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java?rev=664041&r1=664040&r2=664041&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java Fri Jun 6
10:18:01 2008
@@ -1029,6 +1029,16 @@
targets = null;
}
+ void setLastBlock(BlockInfo newblock, DatanodeDescriptor[] newtargets
+ ) throws IOException {
+ if (blocks == null) {
+ throw new IOException("Trying to update non-existant block (newblock="
+ + newblock + ")");
+ }
+ blocks[blocks.length - 1] = newblock;
+ setTargets(newtargets);
+ }
+
/**
* Initialize lease recovery for this object
*/
@@ -1048,6 +1058,8 @@
if (targets[j].isAlive) {
DatanodeDescriptor primary = targets[primaryNodeIndex = j];
primary.addBlockToBeRecovered(blocks[blocks.length - 1], targets);
+ NameNode.stateChangeLog.info("BLOCK* " + blocks[blocks.length - 1]
+ + " recovery started.");
}
}
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java?rev=664041&r1=664040&r2=664041&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java Fri Jun
6 10:18:01 2008
@@ -82,6 +82,10 @@
/** @return the lease containing src */
Lease getLeaseByPath(String src) {return sortedLeasesByPath.get(src);}
+ /** list of blocks being recovered */
+ private static Map<Block, Block> ongoingRecovery = new HashMap<Block,
Block>();
+
+
/** @return the number of leases currently in the system */
synchronized int countLease() {return sortedLeases.size();}
@@ -416,39 +420,60 @@
static Block recoverBlock(Block block, DatanodeID[] datanodeids,
DatanodeProtocol namenode, Configuration conf,
boolean closeFile) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("block=" + block
- + ", datanodeids=" + Arrays.asList(datanodeids));
- }
- List<BlockRecord> syncList = new ArrayList<BlockRecord>();
- long minlength = Long.MAX_VALUE;
- int errorCount = 0;
- //check generation stamps
- for(DatanodeID id : datanodeids) {
- try {
- InterDatanodeProtocol datanode
- = DataNode.createInterDataNodeProtocolProxy(id, conf);
- BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
- if (info != null && info.getGenerationStamp() >=
block.generationStamp) {
- syncList.add(new BlockRecord(id, datanode, new Block(info)));
- if (info.len < minlength) {
- minlength = info.len;
+ // If the block is already being recovered, then skip recovering it.
+ // This can happen if the namenode and client start recovering the same
+ // file at the same time.
+ synchronized (ongoingRecovery) {
+ Block tmp = new Block();
+ tmp.set(block.blkid, block.len, GenerationStamp.WILDCARD_STAMP);
+ if (ongoingRecovery.get(tmp) != null) {
+ String msg = "Block " + block + " is already being recovered, " +
+ " ignoring this request to recover it.";
+ LOG.info(msg);
+ throw new IOException(msg);
+ }
+ ongoingRecovery.put(block, block);
+ }
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("block=" + block
+ + ", datanodeids=" + Arrays.asList(datanodeids));
+ }
+ List<BlockRecord> syncList = new ArrayList<BlockRecord>();
+ long minlength = Long.MAX_VALUE;
+ int errorCount = 0;
+
+ //check generation stamps
+ for(DatanodeID id : datanodeids) {
+ try {
+ InterDatanodeProtocol datanode
+ = DataNode.createInterDataNodeProtocolProxy(id, conf);
+ BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
+ if (info != null && info.getGenerationStamp() >=
block.generationStamp) {
+ syncList.add(new BlockRecord(id, datanode, new Block(info)));
+ if (info.len < minlength) {
+ minlength = info.len;
+ }
}
+ } catch (IOException e) {
+ ++errorCount;
+ InterDatanodeProtocol.LOG.warn(
+ "Failed to getBlockMetaDataInfo for block (=" + block
+ + ") from datanode (=" + id + ")", e);
}
- } catch (IOException e) {
- ++errorCount;
- InterDatanodeProtocol.LOG.warn(
- "Failed to getBlockMetaDataInfo for block (=" + block
- + ") from datanode (=" + id + ")", e);
}
- }
- if (syncList.isEmpty() && errorCount > 0) {
- throw new IOException("All datanodes failed: block=" + block
- + ", datanodeids=" + Arrays.asList(datanodeids));
+ if (syncList.isEmpty() && errorCount > 0) {
+ throw new IOException("All datanodes failed: block=" + block
+ + ", datanodeids=" + Arrays.asList(datanodeids));
+ }
+ return syncBlock(block, minlength, syncList, namenode, closeFile);
+ } finally {
+ synchronized (ongoingRecovery) {
+ ongoingRecovery.remove(block);
+ }
}
- return syncBlock(block, minlength, syncList, namenode, closeFile);
}
/** Block synchronization */
@@ -470,7 +495,7 @@
List<DatanodeID> successList = new ArrayList<DatanodeID>();
- long generationstamp = namenode.nextGenerationStamp();
+ long generationstamp = namenode.nextGenerationStamp(block);
Block newblock = new Block(block.blkid, minlength, generationstamp);
for(BlockRecord r : syncList) {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=664041&r1=664040&r2=664041&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Fri Jun 6
10:18:01 2008
@@ -373,8 +373,8 @@
}
/** [EMAIL PROTECTED] */
- public long nextGenerationStamp() {
- return namesystem.nextGenerationStamp();
+ public long nextGenerationStamp(Block block) throws IOException{
+ return namesystem.nextGenerationStampForBlock(block);
}
/** [EMAIL PROTECTED] */
Added: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestLeaseRecovery2.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestLeaseRecovery2.java?rev=664041&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestLeaseRecovery2.java
(added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestLeaseRecovery2.java
Fri Jun 6 10:18:01 2008
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+
+public class TestLeaseRecovery2 extends junit.framework.TestCase {
+ static final int BLOCK_SIZE = 64;
+ static final int FILE_SIZE = 1024;
+ static final short REPLICATION_NUM = (short)3;
+ static final Random RANDOM = new Random();
+ static byte[] buffer = new byte[FILE_SIZE];
+
+ static void checkMetaInfo(Block b, InterDatanodeProtocol idp
+ ) throws IOException {
+ TestInterDatanodeProtocol.checkMetaInfo(b, idp, null);
+ }
+
+ static int min(Integer... x) {
+ int m = x[0];
+ for(int i = 1; i < x.length; i++) {
+ if (x[i] < m) {
+ m = x[i];
+ }
+ }
+ return m;
+ }
+
+ /**
+ */
+ public void testBlockSynchronization() throws Exception {
+ final long softLease = 1000;
+ final long hardLease = 60 * 60 *1000;
+ final short repl = 3;
+ Configuration conf = new Configuration();
+ conf.setLong("dfs.block.size", BLOCK_SIZE);
+ conf.setInt("io.bytes.per.checksum", 16);
+ MiniDFSCluster cluster = null;
+ byte[] actual = new byte[FILE_SIZE];
+
+ try {
+ cluster = new MiniDFSCluster(conf, 5, true, null);
+ cluster.waitActive();
+
+ //create a file
+ DistributedFileSystem dfs =
(DistributedFileSystem)cluster.getFileSystem();
+ // create a random file name
+ String filestr = "/foo" + RANDOM.nextInt();
+ Path filepath = new Path(filestr);
+ FSDataOutputStream stm = dfs.create(filepath, true,
+ dfs.getConf().getInt("io.file.buffer.size",
4096),
+ (short)repl, (long)BLOCK_SIZE);
+ assertTrue(dfs.dfs.exists(filestr));
+
+ // write random number of bytes into it.
+ int size = RANDOM.nextInt(FILE_SIZE);
+ stm.write(buffer, 0, size);
+
+ // sync file
+ stm.sync();
+
+ // set the soft limit to be 1 second so that the
+ // namenode triggers lease recovery on next attempt to write-for-open.
+ cluster.setLeasePeriod(softLease, hardLease);
+
+ // try to re-open the file before closing the previous handle. This
+ // should fail but will trigger lease recovery.
+ String oldClientName = dfs.dfs.clientName;
+ dfs.dfs.clientName += "_1";
+ while (true) {
+ try {
+ FSDataOutputStream newstm = dfs.create(filepath, false,
+ dfs.getConf().getInt("io.file.buffer.size", 4096),
+ (short)repl, (long)BLOCK_SIZE);
+ assertTrue("Creation of an existing file should never succeed.",
false);
+ } catch (IOException e) {
+ if (e.getMessage().contains("file exists")) {
+ break;
+ }
+ e.printStackTrace();
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ System.out.println("Lease for file " + filepath + " is recovered. " +
+ "validating its contents now...");
+
+ // revert back client identity
+ dfs.dfs.clientName = oldClientName;
+
+ // verify that file-size matches
+ assertTrue("File should be " + size + " bytes, but is actually " +
+ " found to be " + dfs.getFileStatus(filepath).getLen() +
+ " bytes",
+ dfs.getFileStatus(filepath).getLen() == size);
+
+ // verify that there is enough data to read.
+ System.out.println("File size is good. Now validating sizes from
datanodes...");
+ FSDataInputStream stmin = dfs.open(filepath);
+ stmin.readFully(0, actual, 0, size);
+ stmin.close();
+ }
+ finally {
+ try {
+ if (cluster != null) {cluster.shutdown();}
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+}