[partial-ns] Implement startFile().

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e9c9c72b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e9c9c72b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e9c9c72b

Branch: refs/heads/feature-HDFS-8286
Commit: e9c9c72b83b4cf8f7bd2dde8d2c0086dec6e7890
Parents: 72e1828
Author: Haohui Mai <whe...@apache.org>
Authored: Tue May 19 13:29:31 2015 -0700
Committer: Haohui Mai <whe...@apache.org>
Committed: Fri Jun 12 13:56:57 2015 -0700

----------------------------------------------------------------------
 .../hdfs/server/namenode/FSDirWriteFileOp.java  | 288 ++++++++++---------
 .../hadoop/hdfs/server/namenode/FSEditLog.java  |  75 +++--
 .../hdfs/server/namenode/FSEditLogOp.java       |  12 +-
 .../hdfs/server/namenode/RWTransaction.java     |   5 +
 .../hdfs/server/namenode/CreateEditsLog.java    |  26 +-
 .../hdfs/server/namenode/TestEditLog.java       |  25 +-
 6 files changed, 240 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c9c72b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index d7c463a..33e31e7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
 import org.apache.commons.io.Charsets;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.crypto.CipherSuite;
@@ -26,6 +27,7 @@ import 
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -47,13 +49,13 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderCon
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.ChunkedArrayList;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -335,105 +337,147 @@ class FSDirWriteFileOp {
 
     boolean isRawPath = FSDirectory.isReservedRawName(src);
     FSDirectory fsd = fsn.getFSDirectory();
-    byte[][] pathComponents = 
FSDirectory.getPathComponentsForReservedPath(src);
-    src = fsd.resolvePath(pc, src, pathComponents);
-    INodesInPath iip = fsd.getINodesInPath4Write(src);
+    final StringMap ugid = fsd.ugid();
 
-    // Verify that the destination does not exist as a directory already.
-    final INode inode = iip.getLastINode();
-    if (inode != null && inode.isDirectory()) {
-      throw new FileAlreadyExistsException(src +
-          " already exists as a directory");
-    }
+    try (RWTransaction tx = fsd.newRWTransaction().begin()) {
+      Resolver.Result paths = Resolver.resolve(tx, src);
+      if (paths.invalidPath()) {
+        throw new InvalidPathException(src);
+      }
+
+      final FlatINodesInPath iip = paths.inodesInPath();
+      // Verify that the destination does not exist as a directory already.
+      if (paths.ok()) {
+        FlatINode inode = paths.inodesInPath().getLastINode();
+        if (inode.isDirectory()) {
+          throw new FileAlreadyExistsException(src +
+              " already exists as a directory");
+        }
 
-    final INodeFile myFile = INodeFile.valueOf(inode, src, true);
-    if (fsd.isPermissionEnabled()) {
-      if (overwrite && myFile != null) {
-        fsd.checkPathAccess(pc, iip, FsAction.WRITE);
+        if (fsd.isPermissionEnabled()) {
+          if (overwrite) {
+            fsd.checkPathAccess(pc, iip, FsAction.WRITE);
+          }
+        }
       }
+
+      if (fsd.isPermissionEnabled()) {
       /*
        * To overwrite existing file, need to check 'w' permission
        * of parent (equals to ancestor in this case)
        */
-      fsd.checkAncestorAccess(pc, iip, FsAction.WRITE);
-    }
-
-    if (!createParent) {
-      fsd.verifyParentDir(iip, src);
-    }
-
-    if (myFile == null && !create) {
-      throw new FileNotFoundException("Can't overwrite non-existent " +
-          src + " for client " + clientMachine);
-    }
-
-    FileEncryptionInfo feInfo = null;
+        fsd.checkAncestorAccess(pc, paths, FsAction.WRITE);
+      }
 
-    final EncryptionZone zone = fsd.getEZForPath(iip);
-    if (zone != null) {
-      // The path is now within an EZ, but we're missing encryption parameters
-      if (suite == null || edek == null) {
-        throw new RetryStartFileException();
+      if (!createParent && FlatNSUtil.hasNextLevelInPath(paths.src, paths
+          .offset)) {
+        throw new FileNotFoundException(paths.src.substring(0, paths.offset));
       }
-      // Path is within an EZ and we have provided encryption parameters.
-      // Make sure that the generated EDEK matches the settings of the EZ.
-      final String ezKeyName = zone.getKeyName();
-      if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
-        throw new RetryStartFileException();
+
+      if (paths.notFound() && !create) {
+        throw new FileNotFoundException("Can't overwrite non-existent " +
+            src + " for client " + clientMachine);
       }
-      feInfo = new FileEncryptionInfo(suite, version,
-          edek.getEncryptedKeyVersion().getMaterial(),
-          edek.getEncryptedKeyIv(),
-          ezKeyName, edek.getEncryptionKeyVersionName());
-    }
 
-    if (myFile != null) {
-      if (overwrite) {
-        // TODO
-//        List<INode> toRemoveINodes = new ChunkedArrayList<>();
-//        List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
-//        long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks,
-//                                        toRemoveINodes, toRemoveUCFiles, 
now());
-//        if (ret >= 0) {
-//          iip = INodesInPath.replace(iip, iip.length() - 1, null);
-//          FSDirDeleteOp.incrDeletedFileCount(ret);
-//          fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
+      // TODO: Handle encryption
+      FileEncryptionInfo feInfo = null;
+
+//      final EncryptionZone zone = fsd.getEZForPath(iip);
+//      if (zone != null) {
+//        // The path is now within an EZ, but we're missing encryption 
parameters
+//        if (suite == null || edek == null) {
+//          throw new RetryStartFileException();
 //        }
-      } else {
-        // If lease soft limit time is expired, recover the lease
-        fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
-                                 src, holder, clientMachine, false);
-        throw new FileAlreadyExistsException(src + " for client " +
-            clientMachine + " already exists");
+//        // Path is within an EZ and we have provided encryption parameters.
+//        // Make sure that the generated EDEK matches the settings of the EZ.
+//        final String ezKeyName = zone.getKeyName();
+//        if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
+//          throw new RetryStartFileException();
+//        }
+//        feInfo = new FileEncryptionInfo(suite, version,
+//                                        
edek.getEncryptedKeyVersion().getMaterial(),
+//                                        edek.getEncryptedKeyIv(),
+//                                        ezKeyName, 
edek.getEncryptionKeyVersionName());
+//      }
+
+      if (paths.ok()) {
+        if (overwrite) {
+          // TODO
+          List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
+          long ret = FSDirDeleteOp.delete(tx, paths, toRemoveBlocks,
+                                          toRemoveUCFiles, now());
+          if (ret >= 0) {
+            FSDirDeleteOp.incrDeletedFileCount(ret);
+            fsn.removeLeases(toRemoveUCFiles);
+          }
+        } else {
+          // TODO
+          // If lease soft limit time is expired, recover the lease
+//          fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, 
iip,
+//                                   src, holder, clientMachine, false);
+          throw new FileAlreadyExistsException(src + " for client " +
+                                                   clientMachine + " already 
exists");
+        }
       }
+      fsn.checkFsObjectLimit();
+      paths = Resolver.resolve(tx, src);
+      Map.Entry<FlatINodesInPath, String> parent = FSDirMkdirOp
+          .createAncestorDirectories(tx, fsd, paths, permissions);
+      long newId = tx.allocateNewInodeId();
+      FlatINodeFileFeature.Builder fileFeatureBuilder = new 
FlatINodeFileFeature
+          .Builder()
+          .replication(replication)
+          .blockSize(blockSize)
+          .inConstruction(true)
+          .clientName(holder)
+          .clientMachine(clientMachine);
+
+      setNewINodeStoragePolicy(fsn.getBlockManager(), fileFeatureBuilder,
+                               isLazyPersist);
+
+      int userId = tx.getStringId(permissions.getUserName());
+      FlatINode parentINode = parent.getKey().getLastINode();
+      int groupId = permissions.getGroupName() == null
+          ? parentINode.groupId()
+          : tx.getStringId(permissions.getGroupName());
+
+      FlatINodeFileFeature fileFeature = FlatINodeFileFeature.wrap(
+          fileFeatureBuilder.build());
+      ByteString b = new FlatINode.Builder()
+          .id(newId)
+          .type(FlatINode.Type.FILE)
+          .parentId(parentINode.id())
+          .mtime(now())
+          .userId(userId)
+          .groupId(groupId)
+          .permission(permissions.getPermission().toShort())
+          .addFeature(fileFeature)
+          .build();
+
+      FlatINode newNode = FlatINode.wrap(b);
+      // TODO: check .reserved path, quotas and ACL
+      byte[] localName = parent.getValue().getBytes(Charsets.UTF_8);
+      tx.putINode(newId, b);
+      tx.putChild(parentINode.id(), ByteBuffer.wrap(localName), newId);
+
+      ByteString newParent = new FlatINode.Builder().mergeFrom(parentINode)
+          .mtime(now()).build();
+      tx.putINode(parentINode.id(), newParent);
+
+      fsn.leaseManager.addLease(holder, newId);
+//      if (feInfo != null) {
+//        fsd.setFileEncryptionInfo(src, feInfo);
+//        newNode = fsd.getInode(newNode.getId()).asFile();
+//      }
+      tx.logOpenFile(fsd.ugid(), src, newNode, overwrite, logRetryEntry);
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
+            src + " inode " + newId + " " + holder);
+      }
+      tx.commit();
+      return FSDirStatAndListingOp.createFileStatus(
+          tx, fsd, newNode, localName, fileFeature.storagePolicyId());
     }
-    fsn.checkFsObjectLimit();
-    INodeFile newNode = null;
-    Map.Entry<INodesInPath, String> parent = FSDirMkdirOp
-        .createAncestorDirectories(fsd, iip, permissions);
-    if (parent != null) {
-      iip = addFile(fsd, parent.getKey(), parent.getValue(), permissions,
-                    replication, blockSize, holder, clientMachine);
-      newNode = iip != null ? iip.getLastINode().asFile() : null;
-    }
-    if (newNode == null) {
-      throw new IOException("Unable to add " + src +  " to namespace");
-    }
-    fsn.leaseManager.addLease(
-        newNode.getFileUnderConstructionFeature().getClientName(),
-        newNode.getId());
-    if (feInfo != null) {
-      fsd.setFileEncryptionInfo(src, feInfo);
-      newNode = fsd.getInode(newNode.getId()).asFile();
-    }
-    setNewINodeStoragePolicy(fsn.getBlockManager(), newNode, iip,
-                             isLazyPersist);
-    fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
-          src + " inode " + newNode.getId() + " " + holder);
-    }
-    return FSDirStatAndListingOp.getFileInfo(fsd, src, false, isRawPath, true);
   }
 
   static EncryptionKeyInfo getEncryptionKeyInfo(FSNamesystem fsn,
@@ -518,7 +562,7 @@ class FSDirWriteFileOp {
 
       // check quota limits and updated space consumed
       fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
-          fileINode.getPreferredBlockReplication(), true);
+          fileINode.getFileReplication(), true);
 
       // associate new last block for the file
       BlockInfoContiguousUnderConstruction blockInfo =
@@ -542,41 +586,6 @@ class FSDirWriteFileOp {
     }
   }
 
-  /**
-   * Add the given filename to the fs.
-   * @return the new INodesInPath instance that contains the new INode
-   */
-  private static INodesInPath addFile(
-      FSDirectory fsd, INodesInPath existing, String localName,
-      PermissionStatus permissions, short replication, long preferredBlockSize,
-      String clientName, String clientMachine)
-      throws IOException {
-
-    long modTime = now();
-    INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
-                                     modTime, modTime, replication, 
preferredBlockSize);
-    newNode.setLocalName(localName.getBytes(Charsets.UTF_8));
-    newNode.toUnderConstruction(clientName, clientMachine);
-
-    INodesInPath newiip;
-    fsd.writeLock();
-    try {
-      newiip = fsd.addINode(existing, newNode);
-    } finally {
-      fsd.writeUnlock();
-    }
-    if (newiip == null) {
-      NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
-                                       existing.getPath() + "/" + localName);
-      return null;
-    }
-
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* addFile: " + localName + " is 
added");
-    }
-    return newiip;
-  }
-
   private static FileState analyzeFileState(
       FSNamesystem fsn, String src, long fileId, String clientName,
       ExtendedBlock previous, LocatedBlock[] onRetryBlock)
@@ -647,7 +656,7 @@ class FSDirWriteFileOp {
            NameNode.stateChangeLog.debug(
                "BLOCK* NameSystem.allocateBlock: handling block allocation" +
                " writing to a file with a complete previous block: src=" +
-               src + " lastBlock=" + lastBlockInFile);
+                   src + " lastBlock=" + lastBlockInFile);
         }
       } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) 
{
         if (lastBlockInFile.getNumBytes() != 0) {
@@ -672,7 +681,7 @@ class FSDirWriteFileOp {
         // Case 3
         throw new IOException("Cannot allocate block in " + src + ": " +
             "passed 'previous' block " + previous + " does not match actual " +
-            "last block in file " + lastBlockInFile);
+                                  "last block in file " + lastBlockInFile);
       }
     }
     return new FileState(file, src, iip);
@@ -767,12 +776,6 @@ class FSDirWriteFileOp {
         storagePolicyId);
   }
 
-  private static INodeFile newINodeFile(long id, PermissionStatus permissions,
-      long mtime, long atime, short replication, long preferredBlockSize) {
-    return newINodeFile(id, permissions, mtime, atime, replication,
-        preferredBlockSize, (byte)0);
-  }
-
   /**
    * Persist the new block (the last block of the given file).
    */
@@ -809,8 +812,8 @@ class FSDirWriteFileOp {
     DatanodeStorageInfo.incrementBlocksScheduled(targets);
   }
 
-  private static void setNewINodeStoragePolicy(BlockManager bm, INodeFile
-      inode, INodesInPath iip, boolean isLazyPersist)
+  private static void setNewINodeStoragePolicy(
+      BlockManager bm, FlatINodeFileFeature.Builder file, boolean 
isLazyPersist)
       throws IOException {
 
     if (isLazyPersist) {
@@ -824,18 +827,17 @@ class FSDirWriteFileOp {
             "The LAZY_PERSIST storage policy has been disabled " +
             "by the administrator.");
       }
-      inode.setStoragePolicyID(lpPolicy.getId(),
-                                 iip.getLatestSnapshotId());
+      file.storagePolicyId(lpPolicy.getId());
     } else {
-      BlockStoragePolicy effectivePolicy =
-          bm.getStoragePolicy(inode.getStoragePolicyID());
-
-      if (effectivePolicy != null &&
-          effectivePolicy.isCopyOnCreateFile()) {
-        // Copy effective policy from ancestor directory to current file.
-        inode.setStoragePolicyID(effectivePolicy.getId(),
-                                 iip.getLatestSnapshotId());
-      }
+      // TODO: handle effective storage policy id
+//      BlockStoragePolicy effectivePolicy =
+//          bm.getStoragePolicy(parent.getLastINode().storagePolicyId());
+//
+//      if (effectivePolicy != null &&
+//          effectivePolicy.isCopyOnCreateFile()) {
+//        // Copy effective policy from ancestor directory to current file.
+//        file.storagePolicyId(effectivePolicy.getId());
+//      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c9c72b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index a9f0c3e..370050d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -722,40 +722,41 @@ public class FSEditLog implements LogsPurgeable {
    * Add open lease record to edit log. 
    * Records the block locations of the last block.
    */
-  public void logOpenFile(String path, INodeFile newNode, boolean overwrite,
-      boolean toLogRpcIds) {
-    Preconditions.checkArgument(newNode.isUnderConstruction());
-    PermissionStatus permissions = newNode.getPermissionStatus();
+  public void logOpenFile(StringMap ugid, String path, FlatINode inode,
+      boolean overwrite, boolean toLogRpcIds) {
+    FlatINodeFileFeature file = inode.feature(FlatINodeFileFeature.class);
+    Preconditions.checkArgument(file != null && file.inConstruction());
+    PermissionStatus permissions = inode.permissionStatus(ugid);
     AddOp op = AddOp.getInstance(cache.get())
-      .setInodeId(newNode.getId())
+      .setInodeId(inode.id())
       .setPath(path)
-      .setReplication(newNode.getFileReplication())
-      .setModificationTime(newNode.getModificationTime())
-      .setAccessTime(newNode.getAccessTime())
-      .setBlockSize(newNode.getPreferredBlockSize())
-      .setBlocks(newNode.getBlocks())
+      .setReplication(file.replication())
+      .setModificationTime(inode.mtime())
+      .setAccessTime(inode.atime())
+      .setBlockSize(file.blockSize())
+      .setBlocks(file.blocks())
       .setPermissionStatus(permissions)
-      .setClientName(newNode.getFileUnderConstructionFeature().getClientName())
-      .setClientMachine(
-          newNode.getFileUnderConstructionFeature().getClientMachine())
+      .setClientName(file.clientName())
+      .setClientMachine(file.clientMachine())
       .setOverwrite(overwrite)
-      .setStoragePolicyId(newNode.getLocalStoragePolicyID());
-
-    AclFeature f = newNode.getAclFeature();
-    if (f != null) {
-      op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));
-    }
+      .setStoragePolicyId(file.storagePolicyId());
 
-    XAttrFeature x = newNode.getXAttrFeature();
-    if (x != null) {
-      op.setXAttrs(x.getXAttrs());
-    }
+    // TODO: Handle ACL / xattrs
+//    AclFeature f = inode.getAclFeature();
+//    if (f != null) {
+//      op.setAclEntries(AclStorage.readINodeLogicalAcl(inode));
+//    }
+//
+//    XAttrFeature x = inode.getXAttrFeature();
+//    if (x != null) {
+//      op.setXAttrs(x.getXAttrs());
+//    }
 
     logRpcIds(op, toLogRpcIds);
     logEdit(op);
   }
 
-  /** 
+  /**
    * Add close lease record to edit log.
    */
   public void logCloseFile(String path, INodeFile newNode) {
@@ -767,10 +768,10 @@ public class FSEditLog implements LogsPurgeable {
       .setBlockSize(newNode.getPreferredBlockSize())
       .setBlocks(newNode.getBlocks())
       .setPermissionStatus(newNode.getPermissionStatus());
-    
+
     logEdit(op);
   }
-  
+
   public void logAddBlock(String path, INodeFile file) {
     Preconditions.checkArgument(file.isUnderConstruction());
     BlockInfoContiguous[] blocks = file.getBlocks();
@@ -781,7 +782,24 @@ public class FSEditLog implements LogsPurgeable {
         .setPenultimateBlock(pBlock).setLastBlock(lastBlock);
     logEdit(op);
   }
-  
+
+  /**
+   * Add close lease record to edit log.
+   */
+  public void logCloseFile(StringMap ugid, String path, FlatINode inode) {
+    Preconditions.checkArgument(inode.isFile());
+    FlatINodeFileFeature file = inode.feature(FlatINodeFileFeature.class);
+    CloseOp op = CloseOp.getInstance(cache.get())
+        .setPath(path).setReplication(file.replication())
+        .setModificationTime(inode.mtime())
+        .setAccessTime(inode.atime())
+        .setBlockSize(file.blockSize())
+        .setBlocks(file.blocks())
+        .setPermissionStatus(inode.permissionStatus(ugid));
+
+    logEdit(op);
+  }
+
   public void logUpdateBlocks(String path, INodeFile file, boolean 
toLogRpcIds) {
     Preconditions.checkArgument(file.isUnderConstruction());
     UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get())
@@ -798,8 +816,7 @@ public class FSEditLog implements LogsPurgeable {
     PermissionStatus permissions = newNode.getPermissionStatus();
     MkdirOp op = MkdirOp.getInstance(cache.get())
       .setInodeId(newNode.getId())
-      .setPath(path)
-      .setTimestamp(newNode.getModificationTime())
+      .setPath(path).setTimestamp(newNode.getModificationTime())
       .setPermissionStatus(permissions);
 
     AclFeature f = newNode.getAclFeature();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c9c72b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index c8e565e..d73c3d4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -499,7 +499,17 @@ public abstract class FSEditLogOp {
       this.blocks = blocks;
       return (T)this;
     }
-    
+
+    <T extends AddCloseOp> T setBlocks(Iterable<Block> blocks) {
+      ArrayList<Block> b = Lists.newArrayList(blocks);
+      if (b.size() > MAX_BLOCKS) {
+        throw new RuntimeException("Can't have more than " + MAX_BLOCKS +
+                                       " in an AddCloseOp.");
+      }
+      this.blocks = b.toArray(new Block[b.size()]);
+      return (T)this;
+    }
+
     @Override
     public Block[] getBlocks() {
       return blocks;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c9c72b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
index 80379c4..b97b11f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
@@ -154,4 +154,9 @@ class RWTransaction extends Transaction {
       Options.Rename[] options) {
     fsd.getEditLog().logRename(src, dst, mtime, logRetryCache, options);
   }
+
+  public void logOpenFile(StringMap ugid, String src, FlatINode inode,
+      boolean overwrite, boolean logRetryCache) {
+    fsd.getEditLog().logOpenFile(ugid, src, inode, overwrite, logRetryCache);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c9c72b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
index 0349251..21a8265 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import java.io.File;
 import java.io.IOException;
 
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -59,7 +60,7 @@ public class CreateEditsLog {
   static void addFiles(FSEditLog editLog, int numFiles, short replication, 
                          int blocksPerFile, long startingBlockId, long 
blockSize,
                          FileNameGenerator nameGenerator) {
-    
+    StringMap ugid = new StringMap();
     PermissionStatus p = new PermissionStatus("joeDoe", "people",
                                       new FsPermission((short)0777));
     INodeId inodeId = new INodeId();
@@ -81,11 +82,7 @@ public class CreateEditsLog {
          blocks[iB].setBlockId(currentBlockId++);
       }
 
-      final INodeFile inode = new INodeFile(inodeId.nextValue(), null,
-          p, 0L, 0L, blocks, replication, blockSize);
-      inode.toUnderConstruction("", "");
-
-     // Append path to filename with information about blockIDs 
+     // Append path to filename with information about blockIDs
       String path = "_" + iF + "_B" + blocks[0].getBlockId() + 
                     "_to_B" + blocks[blocksPerFile-1].getBlockId() + "_";
       String filePath = nameGenerator.getNextFileName("");
@@ -96,11 +93,18 @@ public class CreateEditsLog {
         dirInode = new INodeDirectory(inodeId.nextValue(), null, p, 0L);
         editLog.logMkDir(currentDir, dirInode);
       }
-      INodeFile fileUc = new INodeFile(inodeId.nextValue(), null,
-          p, 0L, 0L, BlockInfoContiguous.EMPTY_ARRAY, replication, blockSize);
-      fileUc.toUnderConstruction("", "");
-      editLog.logOpenFile(filePath, fileUc, false, false);
-      editLog.logCloseFile(filePath, inode);
+      ByteString file = new FlatINodeFileFeature.Builder()
+          .replication(replication)
+          .blockSize(blockSize)
+          .build();
+      ByteString inode = new FlatINode.Builder()
+          .id(inodeId.nextValue())
+          .userId(ugid.getId(p.getUserName()))
+          .groupId(ugid.getId(p.getGroupName()))
+          .addFeature(FlatINodeFileFeature.wrap(file))
+          .build();
+      editLog.logOpenFile(ugid, filePath, FlatINode.wrap(inode), false, false);
+      editLog.logCloseFile(ugid, filePath, FlatINode.wrap(inode));
 
       if (currentBlockId - bidAtSync >= 2000) { // sync every 2K blocks
         editLog.logSync();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c9c72b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
index 1e42e34..bbd5f5a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
@@ -53,6 +53,7 @@ import java.util.concurrent.Executors;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.protobuf.ByteString;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -202,14 +203,24 @@ public class TestEditLog {
       PermissionStatus p = namesystem.createFsOwnerPermissions(
                                           new FsPermission((short)0777));
       FSEditLog editLog = namesystem.getEditLog();
-
+      FSDirectory fsd = namesystem.getFSDirectory();
+      StringMap ugid = namesystem.getFSDirectory().ugid();
       for (int i = 0; i < numTransactions; i++) {
-        INodeFile inode = new INodeFile(namesystem.dir.allocateNewInodeId(), 
null,
-            p, 0L, 0L, BlockInfoContiguous.EMPTY_ARRAY, replication, 
blockSize);
-        inode.toUnderConstruction("", "");
-
-        editLog.logOpenFile("/filename" + (startIndex + i), inode, false, 
false);
-        editLog.logCloseFile("/filename" + (startIndex + i), inode);
+        ByteString file = new FlatINodeFileFeature.Builder()
+            .replication(replication)
+            .blockSize(blockSize)
+            .build();
+        ByteString inodeBytes = new FlatINode.Builder()
+            .id(fsd.allocateNewInodeId())
+            .userId(ugid.getId(p.getUserName()))
+            .groupId(ugid.getId(p.getGroupName()))
+            .addFeature(FlatINodeFileFeature.wrap(file))
+            .build();
+        FlatINode inode = FlatINode.wrap(inodeBytes);
+
+        editLog.logOpenFile(ugid, "/filename" + (startIndex + i), inode, false,
+                            false);
+        editLog.logCloseFile(ugid, "/filename" + (startIndex + i), inode);
         editLog.logSync();
       }
     }

Reply via email to