[partial-ns] Implement startFile().
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e9c9c72b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e9c9c72b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e9c9c72b Branch: refs/heads/feature-HDFS-8286 Commit: e9c9c72b83b4cf8f7bd2dde8d2c0086dec6e7890 Parents: 72e1828 Author: Haohui Mai <whe...@apache.org> Authored: Tue May 19 13:29:31 2015 -0700 Committer: Haohui Mai <whe...@apache.org> Committed: Fri Jun 12 13:56:57 2015 -0700 ---------------------------------------------------------------------- .../hdfs/server/namenode/FSDirWriteFileOp.java | 288 ++++++++++--------- .../hadoop/hdfs/server/namenode/FSEditLog.java | 75 +++-- .../hdfs/server/namenode/FSEditLogOp.java | 12 +- .../hdfs/server/namenode/RWTransaction.java | 5 + .../hdfs/server/namenode/CreateEditsLog.java | 26 +- .../hdfs/server/namenode/TestEditLog.java | 25 +- 6 files changed, 240 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c9c72b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index d7c463a..33e31e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; import org.apache.commons.io.Charsets; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.crypto.CipherSuite; @@ -26,6 +27,7 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.FsAction; @@ -47,13 +49,13 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderCon import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.util.ChunkedArrayList; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -335,105 +337,147 @@ class FSDirWriteFileOp { boolean isRawPath = FSDirectory.isReservedRawName(src); FSDirectory fsd = fsn.getFSDirectory(); - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); - src = fsd.resolvePath(pc, src, pathComponents); - INodesInPath iip = fsd.getINodesInPath4Write(src); + final StringMap ugid = fsd.ugid(); - // Verify that the destination does not exist as a directory already. - final INode inode = iip.getLastINode(); - if (inode != null && inode.isDirectory()) { - throw new FileAlreadyExistsException(src + - " already exists as a directory"); - } + try (RWTransaction tx = fsd.newRWTransaction().begin()) { + Resolver.Result paths = Resolver.resolve(tx, src); + if (paths.invalidPath()) { + throw new InvalidPathException(src); + } + + final FlatINodesInPath iip = paths.inodesInPath(); + // Verify that the destination does not exist as a directory already. + if (paths.ok()) { + FlatINode inode = paths.inodesInPath().getLastINode(); + if (inode.isDirectory()) { + throw new FileAlreadyExistsException(src + + " already exists as a directory"); + } - final INodeFile myFile = INodeFile.valueOf(inode, src, true); - if (fsd.isPermissionEnabled()) { - if (overwrite && myFile != null) { - fsd.checkPathAccess(pc, iip, FsAction.WRITE); + if (fsd.isPermissionEnabled()) { + if (overwrite) { + fsd.checkPathAccess(pc, iip, FsAction.WRITE); + } + } } + + if (fsd.isPermissionEnabled()) { /* * To overwrite existing file, need to check 'w' permission * of parent (equals to ancestor in this case) */ - fsd.checkAncestorAccess(pc, iip, FsAction.WRITE); - } - - if (!createParent) { - fsd.verifyParentDir(iip, src); - } - - if (myFile == null && !create) { - throw new FileNotFoundException("Can't overwrite non-existent " + - src + " for client " + clientMachine); - } - - FileEncryptionInfo feInfo = null; + fsd.checkAncestorAccess(pc, paths, FsAction.WRITE); + } - final EncryptionZone zone = fsd.getEZForPath(iip); - if (zone != null) { - // The path is now within an EZ, but we're missing encryption parameters - if (suite == null || edek == null) { - throw new RetryStartFileException(); + if (!createParent && FlatNSUtil.hasNextLevelInPath(paths.src, paths + .offset)) { + throw new FileNotFoundException(paths.src.substring(0, paths.offset)); } - // Path is within an EZ and we have provided encryption parameters. - // Make sure that the generated EDEK matches the settings of the EZ. - final String ezKeyName = zone.getKeyName(); - if (!ezKeyName.equals(edek.getEncryptionKeyName())) { - throw new RetryStartFileException(); + + if (paths.notFound() && !create) { + throw new FileNotFoundException("Can't overwrite non-existent " + + src + " for client " + clientMachine); } - feInfo = new FileEncryptionInfo(suite, version, - edek.getEncryptedKeyVersion().getMaterial(), - edek.getEncryptedKeyIv(), - ezKeyName, edek.getEncryptionKeyVersionName()); - } - if (myFile != null) { - if (overwrite) { - // TODO -// List<INode> toRemoveINodes = new ChunkedArrayList<>(); -// List<Long> toRemoveUCFiles = new ChunkedArrayList<>(); -// long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks, -// toRemoveINodes, toRemoveUCFiles, now()); -// if (ret >= 0) { -// iip = INodesInPath.replace(iip, iip.length() - 1, null); -// FSDirDeleteOp.incrDeletedFileCount(ret); -// fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true); + // TODO: Handle encryption + FileEncryptionInfo feInfo = null; + +// final EncryptionZone zone = fsd.getEZForPath(iip); +// if (zone != null) { +// // The path is now within an EZ, but we're missing encryption parameters +// if (suite == null || edek == null) { +// throw new RetryStartFileException(); // } - } else { - // If lease soft limit time is expired, recover the lease - fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip, - src, holder, clientMachine, false); - throw new FileAlreadyExistsException(src + " for client " + - clientMachine + " already exists"); +// // Path is within an EZ and we have provided encryption parameters. +// // Make sure that the generated EDEK matches the settings of the EZ. +// final String ezKeyName = zone.getKeyName(); +// if (!ezKeyName.equals(edek.getEncryptionKeyName())) { +// throw new RetryStartFileException(); +// } +// feInfo = new FileEncryptionInfo(suite, version, +// edek.getEncryptedKeyVersion().getMaterial(), +// edek.getEncryptedKeyIv(), +// ezKeyName, edek.getEncryptionKeyVersionName()); +// } + + if (paths.ok()) { + if (overwrite) { + // TODO + List<Long> toRemoveUCFiles = new ChunkedArrayList<>(); + long ret = FSDirDeleteOp.delete(tx, paths, toRemoveBlocks, + toRemoveUCFiles, now()); + if (ret >= 0) { + FSDirDeleteOp.incrDeletedFileCount(ret); + fsn.removeLeases(toRemoveUCFiles); + } + } else { + // TODO + // If lease soft limit time is expired, recover the lease +// fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip, +// src, holder, clientMachine, false); + throw new FileAlreadyExistsException(src + " for client " + + clientMachine + " already exists"); + } } + fsn.checkFsObjectLimit(); + paths = Resolver.resolve(tx, src); + Map.Entry<FlatINodesInPath, String> parent = FSDirMkdirOp + .createAncestorDirectories(tx, fsd, paths, permissions); + long newId = tx.allocateNewInodeId(); + FlatINodeFileFeature.Builder fileFeatureBuilder = new FlatINodeFileFeature + .Builder() + .replication(replication) + .blockSize(blockSize) + .inConstruction(true) + .clientName(holder) + .clientMachine(clientMachine); + + setNewINodeStoragePolicy(fsn.getBlockManager(), fileFeatureBuilder, + isLazyPersist); + + int userId = tx.getStringId(permissions.getUserName()); + FlatINode parentINode = parent.getKey().getLastINode(); + int groupId = permissions.getGroupName() == null + ? parentINode.groupId() + : tx.getStringId(permissions.getGroupName()); + + FlatINodeFileFeature fileFeature = FlatINodeFileFeature.wrap( + fileFeatureBuilder.build()); + ByteString b = new FlatINode.Builder() + .id(newId) + .type(FlatINode.Type.FILE) + .parentId(parentINode.id()) + .mtime(now()) + .userId(userId) + .groupId(groupId) + .permission(permissions.getPermission().toShort()) + .addFeature(fileFeature) + .build(); + + FlatINode newNode = FlatINode.wrap(b); + // TODO: check .reserved path, quotas and ACL + byte[] localName = parent.getValue().getBytes(Charsets.UTF_8); + tx.putINode(newId, b); + tx.putChild(parentINode.id(), ByteBuffer.wrap(localName), newId); + + ByteString newParent = new FlatINode.Builder().mergeFrom(parentINode) + .mtime(now()).build(); + tx.putINode(parentINode.id(), newParent); + + fsn.leaseManager.addLease(holder, newId); +// if (feInfo != null) { +// fsd.setFileEncryptionInfo(src, feInfo); +// newNode = fsd.getInode(newNode.getId()).asFile(); +// } + tx.logOpenFile(fsd.ugid(), src, newNode, overwrite, logRetryEntry); + if (NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " + + src + " inode " + newId + " " + holder); + } + tx.commit(); + return FSDirStatAndListingOp.createFileStatus( + tx, fsd, newNode, localName, fileFeature.storagePolicyId()); } - fsn.checkFsObjectLimit(); - INodeFile newNode = null; - Map.Entry<INodesInPath, String> parent = FSDirMkdirOp - .createAncestorDirectories(fsd, iip, permissions); - if (parent != null) { - iip = addFile(fsd, parent.getKey(), parent.getValue(), permissions, - replication, blockSize, holder, clientMachine); - newNode = iip != null ? iip.getLastINode().asFile() : null; - } - if (newNode == null) { - throw new IOException("Unable to add " + src + " to namespace"); - } - fsn.leaseManager.addLease( - newNode.getFileUnderConstructionFeature().getClientName(), - newNode.getId()); - if (feInfo != null) { - fsd.setFileEncryptionInfo(src, feInfo); - newNode = fsd.getInode(newNode.getId()).asFile(); - } - setNewINodeStoragePolicy(fsn.getBlockManager(), newNode, iip, - isLazyPersist); - fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry); - if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " + - src + " inode " + newNode.getId() + " " + holder); - } - return FSDirStatAndListingOp.getFileInfo(fsd, src, false, isRawPath, true); } static EncryptionKeyInfo getEncryptionKeyInfo(FSNamesystem fsn, @@ -518,7 +562,7 @@ class FSDirWriteFileOp { // check quota limits and updated space consumed fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), - fileINode.getPreferredBlockReplication(), true); + fileINode.getFileReplication(), true); // associate new last block for the file BlockInfoContiguousUnderConstruction blockInfo = @@ -542,41 +586,6 @@ class FSDirWriteFileOp { } } - /** - * Add the given filename to the fs. - * @return the new INodesInPath instance that contains the new INode - */ - private static INodesInPath addFile( - FSDirectory fsd, INodesInPath existing, String localName, - PermissionStatus permissions, short replication, long preferredBlockSize, - String clientName, String clientMachine) - throws IOException { - - long modTime = now(); - INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions, - modTime, modTime, replication, preferredBlockSize); - newNode.setLocalName(localName.getBytes(Charsets.UTF_8)); - newNode.toUnderConstruction(clientName, clientMachine); - - INodesInPath newiip; - fsd.writeLock(); - try { - newiip = fsd.addINode(existing, newNode); - } finally { - fsd.writeUnlock(); - } - if (newiip == null) { - NameNode.stateChangeLog.info("DIR* addFile: failed to add " + - existing.getPath() + "/" + localName); - return null; - } - - if(NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("DIR* addFile: " + localName + " is added"); - } - return newiip; - } - private static FileState analyzeFileState( FSNamesystem fsn, String src, long fileId, String clientName, ExtendedBlock previous, LocatedBlock[] onRetryBlock) @@ -647,7 +656,7 @@ class FSDirWriteFileOp { NameNode.stateChangeLog.debug( "BLOCK* NameSystem.allocateBlock: handling block allocation" + " writing to a file with a complete previous block: src=" + - src + " lastBlock=" + lastBlockInFile); + src + " lastBlock=" + lastBlockInFile); } } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) { if (lastBlockInFile.getNumBytes() != 0) { @@ -672,7 +681,7 @@ class FSDirWriteFileOp { // Case 3 throw new IOException("Cannot allocate block in " + src + ": " + "passed 'previous' block " + previous + " does not match actual " + - "last block in file " + lastBlockInFile); + "last block in file " + lastBlockInFile); } } return new FileState(file, src, iip); @@ -767,12 +776,6 @@ class FSDirWriteFileOp { storagePolicyId); } - private static INodeFile newINodeFile(long id, PermissionStatus permissions, - long mtime, long atime, short replication, long preferredBlockSize) { - return newINodeFile(id, permissions, mtime, atime, replication, - preferredBlockSize, (byte)0); - } - /** * Persist the new block (the last block of the given file). */ @@ -809,8 +812,8 @@ class FSDirWriteFileOp { DatanodeStorageInfo.incrementBlocksScheduled(targets); } - private static void setNewINodeStoragePolicy(BlockManager bm, INodeFile - inode, INodesInPath iip, boolean isLazyPersist) + private static void setNewINodeStoragePolicy( + BlockManager bm, FlatINodeFileFeature.Builder file, boolean isLazyPersist) throws IOException { if (isLazyPersist) { @@ -824,18 +827,17 @@ class FSDirWriteFileOp { "The LAZY_PERSIST storage policy has been disabled " + "by the administrator."); } - inode.setStoragePolicyID(lpPolicy.getId(), - iip.getLatestSnapshotId()); + file.storagePolicyId(lpPolicy.getId()); } else { - BlockStoragePolicy effectivePolicy = - bm.getStoragePolicy(inode.getStoragePolicyID()); - - if (effectivePolicy != null && - effectivePolicy.isCopyOnCreateFile()) { - // Copy effective policy from ancestor directory to current file. - inode.setStoragePolicyID(effectivePolicy.getId(), - iip.getLatestSnapshotId()); - } + // TODO: handle effective storage policy id +// BlockStoragePolicy effectivePolicy = +// bm.getStoragePolicy(parent.getLastINode().storagePolicyId()); +// +// if (effectivePolicy != null && +// effectivePolicy.isCopyOnCreateFile()) { +// // Copy effective policy from ancestor directory to current file. +// file.storagePolicyId(effectivePolicy.getId()); +// } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c9c72b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index a9f0c3e..370050d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -722,40 +722,41 @@ public class FSEditLog implements LogsPurgeable { * Add open lease record to edit log. * Records the block locations of the last block. */ - public void logOpenFile(String path, INodeFile newNode, boolean overwrite, - boolean toLogRpcIds) { - Preconditions.checkArgument(newNode.isUnderConstruction()); - PermissionStatus permissions = newNode.getPermissionStatus(); + public void logOpenFile(StringMap ugid, String path, FlatINode inode, + boolean overwrite, boolean toLogRpcIds) { + FlatINodeFileFeature file = inode.feature(FlatINodeFileFeature.class); + Preconditions.checkArgument(file != null && file.inConstruction()); + PermissionStatus permissions = inode.permissionStatus(ugid); AddOp op = AddOp.getInstance(cache.get()) - .setInodeId(newNode.getId()) + .setInodeId(inode.id()) .setPath(path) - .setReplication(newNode.getFileReplication()) - .setModificationTime(newNode.getModificationTime()) - .setAccessTime(newNode.getAccessTime()) - .setBlockSize(newNode.getPreferredBlockSize()) - .setBlocks(newNode.getBlocks()) + .setReplication(file.replication()) + .setModificationTime(inode.mtime()) + .setAccessTime(inode.atime()) + .setBlockSize(file.blockSize()) + .setBlocks(file.blocks()) .setPermissionStatus(permissions) - .setClientName(newNode.getFileUnderConstructionFeature().getClientName()) - .setClientMachine( - newNode.getFileUnderConstructionFeature().getClientMachine()) + .setClientName(file.clientName()) + .setClientMachine(file.clientMachine()) .setOverwrite(overwrite) - .setStoragePolicyId(newNode.getLocalStoragePolicyID()); - - AclFeature f = newNode.getAclFeature(); - if (f != null) { - op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode)); - } + .setStoragePolicyId(file.storagePolicyId()); - XAttrFeature x = newNode.getXAttrFeature(); - if (x != null) { - op.setXAttrs(x.getXAttrs()); - } + // TODO: Handle ACL / xattrs +// AclFeature f = inode.getAclFeature(); +// if (f != null) { +// op.setAclEntries(AclStorage.readINodeLogicalAcl(inode)); +// } +// +// XAttrFeature x = inode.getXAttrFeature(); +// if (x != null) { +// op.setXAttrs(x.getXAttrs()); +// } logRpcIds(op, toLogRpcIds); logEdit(op); } - /** + /** * Add close lease record to edit log. */ public void logCloseFile(String path, INodeFile newNode) { @@ -767,10 +768,10 @@ public class FSEditLog implements LogsPurgeable { .setBlockSize(newNode.getPreferredBlockSize()) .setBlocks(newNode.getBlocks()) .setPermissionStatus(newNode.getPermissionStatus()); - + logEdit(op); } - + public void logAddBlock(String path, INodeFile file) { Preconditions.checkArgument(file.isUnderConstruction()); BlockInfoContiguous[] blocks = file.getBlocks(); @@ -781,7 +782,24 @@ public class FSEditLog implements LogsPurgeable { .setPenultimateBlock(pBlock).setLastBlock(lastBlock); logEdit(op); } - + + /** + * Add close lease record to edit log. + */ + public void logCloseFile(StringMap ugid, String path, FlatINode inode) { + Preconditions.checkArgument(inode.isFile()); + FlatINodeFileFeature file = inode.feature(FlatINodeFileFeature.class); + CloseOp op = CloseOp.getInstance(cache.get()) + .setPath(path).setReplication(file.replication()) + .setModificationTime(inode.mtime()) + .setAccessTime(inode.atime()) + .setBlockSize(file.blockSize()) + .setBlocks(file.blocks()) + .setPermissionStatus(inode.permissionStatus(ugid)); + + logEdit(op); + } + public void logUpdateBlocks(String path, INodeFile file, boolean toLogRpcIds) { Preconditions.checkArgument(file.isUnderConstruction()); UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get()) @@ -798,8 +816,7 @@ public class FSEditLog implements LogsPurgeable { PermissionStatus permissions = newNode.getPermissionStatus(); MkdirOp op = MkdirOp.getInstance(cache.get()) .setInodeId(newNode.getId()) - .setPath(path) - .setTimestamp(newNode.getModificationTime()) + .setPath(path).setTimestamp(newNode.getModificationTime()) .setPermissionStatus(permissions); AclFeature f = newNode.getAclFeature(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c9c72b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index c8e565e..d73c3d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -499,7 +499,17 @@ public abstract class FSEditLogOp { this.blocks = blocks; return (T)this; } - + + <T extends AddCloseOp> T setBlocks(Iterable<Block> blocks) { + ArrayList<Block> b = Lists.newArrayList(blocks); + if (b.size() > MAX_BLOCKS) { + throw new RuntimeException("Can't have more than " + MAX_BLOCKS + + " in an AddCloseOp."); + } + this.blocks = b.toArray(new Block[b.size()]); + return (T)this; + } + @Override public Block[] getBlocks() { return blocks; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c9c72b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java index 80379c4..b97b11f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java @@ -154,4 +154,9 @@ class RWTransaction extends Transaction { Options.Rename[] options) { fsd.getEditLog().logRename(src, dst, mtime, logRetryCache, options); } + + public void logOpenFile(StringMap ugid, String src, FlatINode inode, + boolean overwrite, boolean logRetryCache) { + fsd.getEditLog().logOpenFile(ugid, src, inode, overwrite, logRetryCache); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c9c72b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java index 0349251..21a8265 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.File; import java.io.IOException; +import com.google.protobuf.ByteString; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.protocol.Block; @@ -59,7 +60,7 @@ public class CreateEditsLog { static void addFiles(FSEditLog editLog, int numFiles, short replication, int blocksPerFile, long startingBlockId, long blockSize, FileNameGenerator nameGenerator) { - + StringMap ugid = new StringMap(); PermissionStatus p = new PermissionStatus("joeDoe", "people", new FsPermission((short)0777)); INodeId inodeId = new INodeId(); @@ -81,11 +82,7 @@ public class CreateEditsLog { blocks[iB].setBlockId(currentBlockId++); } - final INodeFile inode = new INodeFile(inodeId.nextValue(), null, - p, 0L, 0L, blocks, replication, blockSize); - inode.toUnderConstruction("", ""); - - // Append path to filename with information about blockIDs + // Append path to filename with information about blockIDs String path = "_" + iF + "_B" + blocks[0].getBlockId() + "_to_B" + blocks[blocksPerFile-1].getBlockId() + "_"; String filePath = nameGenerator.getNextFileName(""); @@ -96,11 +93,18 @@ public class CreateEditsLog { dirInode = new INodeDirectory(inodeId.nextValue(), null, p, 0L); editLog.logMkDir(currentDir, dirInode); } - INodeFile fileUc = new INodeFile(inodeId.nextValue(), null, - p, 0L, 0L, BlockInfoContiguous.EMPTY_ARRAY, replication, blockSize); - fileUc.toUnderConstruction("", ""); - editLog.logOpenFile(filePath, fileUc, false, false); - editLog.logCloseFile(filePath, inode); + ByteString file = new FlatINodeFileFeature.Builder() + .replication(replication) + .blockSize(blockSize) + .build(); + ByteString inode = new FlatINode.Builder() + .id(inodeId.nextValue()) + .userId(ugid.getId(p.getUserName())) + .groupId(ugid.getId(p.getGroupName())) + .addFeature(FlatINodeFileFeature.wrap(file)) + .build(); + editLog.logOpenFile(ugid, filePath, FlatINode.wrap(inode), false, false); + editLog.logCloseFile(ugid, filePath, FlatINode.wrap(inode)); if (currentBlockId - bidAtSync >= 2000) { // sync every 2K blocks editLog.logSync(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c9c72b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index 1e42e34..bbd5f5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -53,6 +53,7 @@ import java.util.concurrent.Executors; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.protobuf.ByteString; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -202,14 +203,24 @@ public class TestEditLog { PermissionStatus p = namesystem.createFsOwnerPermissions( new FsPermission((short)0777)); FSEditLog editLog = namesystem.getEditLog(); - + FSDirectory fsd = namesystem.getFSDirectory(); + StringMap ugid = namesystem.getFSDirectory().ugid(); for (int i = 0; i < numTransactions; i++) { - INodeFile inode = new INodeFile(namesystem.dir.allocateNewInodeId(), null, - p, 0L, 0L, BlockInfoContiguous.EMPTY_ARRAY, replication, blockSize); - inode.toUnderConstruction("", ""); - - editLog.logOpenFile("/filename" + (startIndex + i), inode, false, false); - editLog.logCloseFile("/filename" + (startIndex + i), inode); + ByteString file = new FlatINodeFileFeature.Builder() + .replication(replication) + .blockSize(blockSize) + .build(); + ByteString inodeBytes = new FlatINode.Builder() + .id(fsd.allocateNewInodeId()) + .userId(ugid.getId(p.getUserName())) + .groupId(ugid.getId(p.getGroupName())) + .addFeature(FlatINodeFileFeature.wrap(file)) + .build(); + FlatINode inode = FlatINode.wrap(inodeBytes); + + editLog.logOpenFile(ugid, "/filename" + (startIndex + i), inode, false, + false); + editLog.logCloseFile(ugid, "/filename" + (startIndex + i), inode); editLog.logSync(); } }