HDFS-8999. Allow a file to be closed with COMMITTED but not yet COMPLETE blocks.
(cherry picked from commit b10d8ced21a860390c46e7729a02b81d9f7b88e6) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0ad3c51d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0ad3c51d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0ad3c51d Branch: refs/heads/branch-2.8 Commit: 0ad3c51dfb4ca50677e078ed870e8ec120436ea4 Parents: 1d15c90 Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Authored: Thu Jan 28 10:42:40 2016 +0800 Committer: Vinayakumar B <vinayakum...@apache.org> Committed: Wed Feb 3 07:41:06 2016 +0530 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hdfs/DFSClient.java | 41 ++++++++--- .../org/apache/hadoop/hdfs/DFSUtilClient.java | 55 +++++++++------ .../org/apache/hadoop/hdfs/DataStreamer.java | 3 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 ++ .../server/blockmanagement/BlockManager.java | 31 +++----- .../hdfs/server/namenode/FSDirAppendOp.java | 15 +++- .../hdfs/server/namenode/FSDirWriteFileOp.java | 4 +- .../hdfs/server/namenode/FSEditLogLoader.java | 8 +-- .../hdfs/server/namenode/FSNamesystem.java | 74 +++++++++++++++----- .../hadoop/hdfs/server/namenode/INodeFile.java | 56 +++++++++++---- .../hdfs/server/namenode/LeaseManager.java | 17 ++--- .../org/apache/hadoop/hdfs/TestFileAppend.java | 56 ++++++++++++++- .../hdfs/server/namenode/TestINodeFile.java | 6 +- .../hdfs/server/namenode/TestLeaseManager.java | 4 +- 15 files changed, 271 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 088b10e..6271da2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -34,7 +34,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.lang.reflect.Proxy; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; @@ -168,6 +167,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RpcNoSuchMethodException; import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.NetUtils; @@ -182,16 +182,15 @@ import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.net.InetAddresses; -import org.apache.htrace.core.Tracer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /******************************************************** * DFSClient can connect to a Hadoop Filesystem and @@ -1355,17 +1354,43 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } } + /** + * Invoke namenode append RPC. + * It retries in case of {@link BlockNotYetCompleteException}. + */ + private LastBlockWithStatus callAppend(String src, + EnumSetWritable<CreateFlag> flag) throws IOException { + final long startTime = Time.monotonicNow(); + for(;;) { + try { + return namenode.append(src, clientName, flag); + } catch(RemoteException re) { + if (Time.monotonicNow() - startTime > 5000 + || !RetriableException.class.getName().equals( + re.getClassName())) { + throw re; + } + + try { // sleep and retry + Thread.sleep(500); + } catch (InterruptedException e) { + throw DFSUtilClient.toInterruptedIOException("callAppend", e); + } + } + } + } + /** Method to get stream returned by append call */ private DFSOutputStream callAppend(String src, EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes) throws IOException { CreateFlag.validateForAppend(flag); try { - LastBlockWithStatus blkWithStatus = namenode.append(src, clientName, + final LastBlockWithStatus blkWithStatus = callAppend(src, new EnumSetWritable<>(flag, CreateFlag.class)); HdfsFileStatus status = blkWithStatus.getFileStatus(); if (status == null) { - DFSClient.LOG.debug("NameNode is on an older version, request file " + - "info with additional RPC call for file: " + src); + LOG.debug("NameNode is on an older version, request file " + + "info with additional RPC call for file: {}", src); status = getFileInfo(src); } return DFSOutputStream.newStreamForAppend(this, src, flag, progress, http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index 61a34c2..2e891a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -17,9 +17,29 @@ */ package org.apache.hadoop.hdfs; -import com.google.common.base.Joiner; -import com.google.common.collect.Maps; -import com.google.common.primitives.SignedBytes; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.UnsupportedEncodingException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.channels.SocketChannel; +import java.text.SimpleDateFormat; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import javax.net.SocketFactory; + import org.apache.commons.io.Charsets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.KeyProvider; @@ -52,26 +72,9 @@ import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.SocketFactory; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.channels.SocketChannel; -import java.text.SimpleDateFormat; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; - -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES; +import com.google.common.base.Joiner; +import com.google.common.collect.Maps; +import com.google.common.primitives.SignedBytes; public class DFSUtilClient { public static final byte[] EMPTY_BYTES = {}; @@ -676,4 +679,10 @@ public class DFSUtilClient { } } + public static InterruptedIOException toInterruptedIOException(String message, + InterruptedException e) { + final InterruptedIOException iioe = new InterruptedIOException(message); + iioe.initCause(e); + return iioe; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index b406c67..abbb1ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -455,8 +455,7 @@ class DataStreamer extends Daemon { setPipeline(lastBlock); if (nodes.length < 1) { throw new IOException("Unable to retrieve blocks locations " + - " for last block " + block + - "of file " + src); + " for last block " + block + " of file " + src); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1eb7ff7..824b869 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -956,6 +956,9 @@ Release 2.8.0 - UNRELEASED HDFS-9436. Make NNThroughputBenchmark$BlockReportStats run with 10 datanodes by default. (Mingliang Liu via shv) + HDFS-8999. Allow a file to be closed with COMMITTED but not yet COMPLETE + blocks. (szetszwo) + BUG FIXES HDFS-8091: ACLStatus and XAttributes should be presented to http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 7450730..f981b33 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -189,6 +189,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_REPLICATION_MIN_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MIN_KEY; public static final int DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1; + public static final String DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY + = "dfs.namenode.file.close.num-committed-allowed"; + public static final int DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT + = 0; public static final String DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY = "dfs.namenode.safemode.replication.min"; public static final String DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY = http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index e6ec759..6bf0cc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -618,6 +618,10 @@ public class BlockManager implements BlockStatsMXBean { return (countNodes(block).liveReplicas() >= minReplication); } + public short getMinReplication() { + return minReplication; + } + /** * Commit a block of a file * @@ -665,7 +669,7 @@ public class BlockManager implements BlockStatsMXBean { final boolean b = commitBlock(lastBlock, commitBlock); if (countNodes(lastBlock).liveReplicas() >= minReplication) { if (b) { - addExpectedReplicasToPending(lastBlock); + addExpectedReplicasToPending(lastBlock, bc); } completeBlock(lastBlock, false); } @@ -677,6 +681,10 @@ public class BlockManager implements BlockStatsMXBean { * pendingReplications in order to keep ReplicationMonitor from scheduling * the block. */ + public void addExpectedReplicasToPending(BlockInfo blk, BlockCollection bc) { + addExpectedReplicasToPending(blk); + } + private void addExpectedReplicasToPending(BlockInfo lastBlock) { DatanodeStorageInfo[] expectedStorages = lastBlock.getUnderConstructionFeature().getExpectedStorageLocations(); @@ -2617,7 +2625,7 @@ public class BlockManager implements BlockStatsMXBean { if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && numLiveReplicas >= minReplication) { - addExpectedReplicasToPending(storedBlock); + addExpectedReplicasToPending(storedBlock, bc); completeBlock(storedBlock, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block @@ -3453,25 +3461,6 @@ public class BlockManager implements BlockStatsMXBean { } } - /** - * Check that the indicated blocks are present and - * replicated. - */ - public boolean checkBlocksProperlyReplicated( - String src, BlockInfo[] blocks) { - for (BlockInfo b: blocks) { - if (!b.isComplete()) { - final int numNodes = b.numNodes(); - LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = " - + b.getBlockUCState() + ", replication# = " + numNodes - + (numNodes < minReplication ? " < ": " >= ") - + " minimum = " + minReplication + ") in file " + src); - return false; - } - } - return true; - } - /** * @return 0 if the block is not found; * otherwise, return the replication factor of the block. http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java index 53255e6..e5b1392 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java @@ -33,8 +33,10 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp; import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature; +import org.apache.hadoop.ipc.RetriableException; import com.google.common.base.Preconditions; @@ -119,10 +121,17 @@ final class FSDirAppendOp { final BlockInfo lastBlock = file.getLastBlock(); // Check that the block has at least minimum replication. - if (lastBlock != null && lastBlock.isComplete() + if (lastBlock != null) { + if (lastBlock.getBlockUCState() == BlockUCState.COMMITTED) { + throw new RetriableException( + new NotReplicatedYetException("append: lastBlock=" + + lastBlock + " of src=" + path + + " is COMMITTED but not yet COMPLETE.")); + } else if (lastBlock.isComplete() && !blockManager.isSufficientlyReplicated(lastBlock)) { - throw new IOException("append: lastBlock=" + lastBlock + " of src=" - + path + " is not sufficiently replicated yet."); + throw new IOException("append: lastBlock=" + lastBlock + " of src=" + + path + " is not sufficiently replicated yet."); + } } lb = prepareFileForAppend(fsn, iip, holder, clientMachine, newBlock, true, logRetryCache); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 3662bce..17e2459 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -758,8 +758,10 @@ class FSDirWriteFileOp { return false; } + fsn.addCommittedBlocksToPending(pendingFile); + fsn.finalizeINodeFileUnderConstruction(src, pendingFile, - Snapshot.CURRENT_STATE_ID); + Snapshot.CURRENT_STATE_ID, true); return true; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 7077990..094bb9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp; import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade; import static org.apache.hadoop.util.Time.monotonicNow; @@ -29,7 +28,6 @@ import java.util.EnumMap; import java.util.EnumSet; import java.util.List; -import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -89,6 +87,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; @@ -446,8 +445,9 @@ public class FSEditLogLoader { // One might expect that you could use removeLease(holder, path) here, // but OP_CLOSE doesn't serialize the holder. So, remove the inode. if (file.isUnderConstruction()) { - fsNamesys.leaseManager.removeLeases(Lists.newArrayList(file.getId())); - file.toCompleteFile(file.getModificationTime()); + fsNamesys.getLeaseManager().removeLease(file.getId()); + file.toCompleteFile(file.getModificationTime(), 0, + fsNamesys.getBlockManager().getMinReplication()); } break; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 1ecb286..e8eae64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -463,6 +463,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, private final long minBlockSize; // minimum block size final long maxBlocksPerFile; // maximum # of blocks per file + private final int numCommittedAllowed; /** Lock to protect FSNamesystem. */ private final FSNamesystemLock fsLock; @@ -769,6 +770,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT); this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY, DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT); + this.numCommittedAllowed = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, + DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT); this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY, DFS_SUPPORT_APPEND_DEFAULT); LOG.info("Append Enabled: " + supportAppends); @@ -2602,18 +2606,37 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, boolean checkFileProgress(String src, INodeFile v, boolean checkall) { assert hasReadLock(); if (checkall) { - return blockManager.checkBlocksProperlyReplicated(src, v - .getBlocks()); + return checkBlocksComplete(src, true, v.getBlocks()); } else { - // check the penultimate block of this file - BlockInfo b = v.getPenultimateBlock(); - return b == null || - blockManager.checkBlocksProperlyReplicated( - src, new BlockInfo[] { b }); + final BlockInfo[] blocks = v.getBlocks(); + final int i = blocks.length - numCommittedAllowed - 2; + return i < 0 || blocks[i] == null + || checkBlocksComplete(src, false, blocks[i]); } } /** + * Check if the blocks are COMPLETE; + * it may allow the last block to be COMMITTED. + */ + private boolean checkBlocksComplete(String src, boolean allowCommittedBlock, + BlockInfo... blocks) { + final int n = allowCommittedBlock? numCommittedAllowed: 0; + for(int i = 0; i < blocks.length; i++) { + final short min = blockManager.getMinReplication(); + final String err = INodeFile.checkBlockComplete(blocks, i, n, min); + if (err != null) { + final int numNodes = blocks[i].numNodes(); + LOG.info("BLOCK* " + err + "(numNodes= " + numNodes + + (numNodes < min ? " < " : " >= ") + + " minimum = " + min + ") in file " + src); + return false; + } + } + return true; + } + + /** * Change the indicated filename. * @deprecated Use {@link #renameTo(String, String, boolean, * Options.Rename...)} instead. @@ -2746,7 +2769,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, List<INode> removedINodes, final boolean acquireINodeMapLock) { assert hasWriteLock(); - leaseManager.removeLeases(removedUCFiles); + for(long i : removedUCFiles) { + leaseManager.removeLease(i); + } // remove inodes from inodesMap if (removedINodes != null) { if (acquireINodeMapLock) { @@ -3054,7 +3079,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // then reap lease immediately and close the file. if(nrCompleteBlocks == nrBlocks) { finalizeINodeFileUnderConstruction(src, pendingFile, - iip.getLatestSnapshotId()); + iip.getLatestSnapshotId(), false); NameNode.stateChangeLog.warn("BLOCK*" + " internalReleaseLease: All existing blocks are COMPLETE," + " lease removed, file closed."); @@ -3093,7 +3118,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, if(penultimateBlockMinReplication && blockManager.checkMinReplication(lastBlock)) { finalizeINodeFileUnderConstruction(src, pendingFile, - iip.getLatestSnapshotId()); + iip.getLatestSnapshotId(), false); NameNode.stateChangeLog.warn("BLOCK*" + " internalReleaseLease: Committed blocks are minimally replicated," + " lease removed, file closed."); @@ -3137,7 +3162,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // We can remove this block and close the file. pendingFile.removeLastBlock(lastBlock); finalizeINodeFileUnderConstruction(src, pendingFile, - iip.getLatestSnapshotId()); + iip.getLatestSnapshotId(), false); NameNode.stateChangeLog.warn("BLOCK* internalReleaseLease: " + "Removed empty last block and closed file."); return true; @@ -3202,8 +3227,23 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } } - void finalizeINodeFileUnderConstruction( - String src, INodeFile pendingFile, int latestSnapshot) throws IOException { + void addCommittedBlocksToPending(final INodeFile pendingFile) { + final BlockInfo[] blocks = pendingFile.getBlocks(); + int i = blocks.length - numCommittedAllowed; + if (i < 0) { + i = 0; + } + for(; i < blocks.length; i++) { + final BlockInfo b = blocks[i]; + if (b != null && b.getBlockUCState() == BlockUCState.COMMITTED) { + // b is COMMITTED but not yet COMPLETE, add it to pending replication. + blockManager.addExpectedReplicasToPending(b, pendingFile); + } + } + } + + void finalizeINodeFileUnderConstruction(String src, INodeFile pendingFile, + int latestSnapshot, boolean allowCommittedBlock) throws IOException { assert hasWriteLock(); FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature(); @@ -3218,7 +3258,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // The file is no longer pending. // Create permanent INode, update blocks. No need to replace the inode here // since we just remove the uc feature from pendingFile - pendingFile.toCompleteFile(now()); + pendingFile.toCompleteFile(now(), + allowCommittedBlock? numCommittedAllowed: 0, + blockManager.getMinReplication()); waitForLoadingFSImage(); // close file and persist block allocations for this file @@ -3468,8 +3510,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, commitOrCompleteLastBlock(pendingFile, iip, storedBlock); //remove lease, close file - finalizeINodeFileUnderConstruction(src, pendingFile, - Snapshot.findLatestSnapshot(pendingFile, Snapshot.CURRENT_STATE_ID)); + int s = Snapshot.findLatestSnapshot(pendingFile, Snapshot.CURRENT_STATE_ID); + finalizeINodeFileUnderConstruction(src, pendingFile, s, false); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index 2eb9a80..e674c5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -201,28 +201,56 @@ public class INodeFile extends INodeWithAdditionalFields * Convert the file to a complete file, i.e., to remove the Under-Construction * feature. */ - public INodeFile toCompleteFile(long mtime) { - Preconditions.checkState(isUnderConstruction(), - "file is no longer under construction"); - FileUnderConstructionFeature uc = getFileUnderConstructionFeature(); - if (uc != null) { - assertAllBlocksComplete(); - removeFeature(uc); - this.setModificationTime(mtime); - } - return this; + void toCompleteFile(long mtime, int numCommittedAllowed, short minReplication) { + final FileUnderConstructionFeature uc = getFileUnderConstructionFeature(); + Preconditions.checkNotNull(uc, "File %s is not under construction", this); + assertAllBlocksComplete(numCommittedAllowed, minReplication); + removeFeature(uc); + setModificationTime(mtime); } /** Assert all blocks are complete. */ - private void assertAllBlocksComplete() { + private void assertAllBlocksComplete(int numCommittedAllowed, + short minReplication) { if (blocks == null) { return; } for (int i = 0; i < blocks.length; i++) { - Preconditions.checkState(blocks[i].isComplete(), "Failed to finalize" - + " %s %s since blocks[%s] is non-complete, where blocks=%s.", - getClass().getSimpleName(), this, i, Arrays.asList(blocks)); + final String err = checkBlockComplete(blocks, i, numCommittedAllowed, + minReplication); + Preconditions.checkState(err == null, + "Unexpected block state: %s, file=%s (%s), blocks=%s (i=%s)", + err, this, getClass().getSimpleName(), Arrays.asList(blocks), i); + } + } + + /** + * Check if the i-th block is COMPLETE; + * when the i-th block is the last block, it may be allowed to be COMMITTED. + * + * @return null if the block passes the check; + * otherwise, return an error message. + */ + static String checkBlockComplete(BlockInfo[] blocks, int i, + int numCommittedAllowed, short minReplication) { + final BlockInfo b = blocks[i]; + final BlockUCState state = b.getBlockUCState(); + if (state == BlockUCState.COMPLETE) { + return null; + } + if (i < blocks.length - numCommittedAllowed) { + return b + " is " + state + " but not COMPLETE"; } + if (state != BlockUCState.COMMITTED) { + return b + " is " + state + " but neither COMPLETE nor COMMITTED"; + } + final int numExpectedLocations + = b.getUnderConstructionFeature().getNumExpectedLocations(); + if (numExpectedLocations <= minReplication) { + return b + " is " + state + " but numExpectedLocations = " + + numExpectedLocations + " <= minReplication = " + minReplication; + } + return null; } @Override // BlockCollection http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java index 908af45..6bc9e34 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java @@ -160,6 +160,13 @@ public class LeaseManager { return lease; } + synchronized void removeLease(long inodeId) { + final Lease lease = leasesById.get(inodeId); + if (lease != null) { + removeLease(lease, inodeId); + } + } + /** * Remove the specified lease and src. */ @@ -298,16 +305,6 @@ public class LeaseManager { } } - @VisibleForTesting - synchronized void removeLeases(Collection<Long> inodes) { - for (long inode : inodes) { - Lease lease = leasesById.get(inode); - if (lease != null) { - removeLease(lease, inode); - } - } - } - public void setLeasePeriod(long softLimit, long hardLimit) { this.softLimit = softLimit; this.hardLimit = hardLimit; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index ea1d0a6..84699a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -27,10 +27,12 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.HardLink; @@ -41,12 +43,12 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.Time; import org.junit.Assert; import org.junit.Test; @@ -55,6 +57,8 @@ import org.junit.Test; * support HDFS appends. */ public class TestFileAppend{ + private static final long RANDOM_TEST_RUNTIME = 10000; + final boolean simulatedStorage = false; private static byte[] fileContents = null; @@ -381,6 +385,56 @@ public class TestFileAppend{ } } + + @Test + public void testMultipleAppends() throws Exception { + final long startTime = Time.monotonicNow(); + final Configuration conf = new HdfsConfiguration(); + conf.setInt( + DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, 1); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(4).build(); + final DistributedFileSystem fs = cluster.getFileSystem(); + try { + final Path p = new Path("/testMultipleAppend/foo"); + final int blockSize = 1 << 16; + final byte[] data = AppendTestUtil.initBuffer(blockSize); + + // create an empty file. + fs.create(p, true, 4096, (short)3, blockSize).close(); + + int fileLen = 0; + for(int i = 0; + i < 10 || Time.monotonicNow() - startTime < RANDOM_TEST_RUNTIME; + i++) { + int appendLen = ThreadLocalRandom.current().nextInt(100) + 1; + if (fileLen + appendLen > data.length) { + break; + } + + AppendTestUtil.LOG.info(i + ") fileLen=" + fileLen + + ", appendLen=" + appendLen); + final FSDataOutputStream out = fs.append(p); + out.write(data, fileLen, appendLen); + out.close(); + fileLen += appendLen; + } + + Assert.assertEquals(fileLen, fs.getFileStatus(p).getLen()); + final byte[] actual = new byte[fileLen]; + final FSDataInputStream in = fs.open(p); + in.readFully(actual); + in.close(); + for(int i = 0; i < fileLen; i++) { + Assert.assertEquals(data[i], actual[i]); + } + } finally { + fs.close(); + cluster.shutdown(); + } + } + /** Tests appending after soft-limit expires. */ @Test public void testAppendAfterSoftLimit() http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java index 4d93a32..b5bb5d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java @@ -92,6 +92,10 @@ public class TestINodeFile { (short)3, 1024L); } + static void toCompleteFile(INodeFile file) { + file.toCompleteFile(Time.now(), 0, (short)1); + } + INodeFile createINodeFile(short replication, long preferredBlockSize) { return new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID, null, perm, 0L, 0L, null, replication, preferredBlockSize, (byte)0); @@ -1089,7 +1093,7 @@ public class TestINodeFile { assertEquals(clientName, uc.getClientName()); assertEquals(clientMachine, uc.getClientMachine()); - file.toCompleteFile(Time.now()); + toCompleteFile(file); assertFalse(file.isUnderConstruction()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad3c51d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java index de30161..3bb7bb7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java @@ -51,8 +51,8 @@ public class TestLeaseManager { } assertEquals(4, lm.getINodeIdWithLeases().size()); - synchronized (lm) { - lm.removeLeases(ids); + for (long id : ids) { + lm.removeLease(id); } assertEquals(0, lm.getINodeIdWithLeases().size()); }