HDFS-10636. Modify ReplicaInfo to remove the assumption that replica metadata 
and data are stored in java.io.File. (Virajith Jalaparti via lei)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/86c9862b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/86c9862b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/86c9862b

Branch: refs/heads/HADOOP-12756
Commit: 86c9862bec0248d671e657aa56094a2919b8ac14
Parents: 1c0d18f
Author: Lei Xu <l...@apache.org>
Authored: Tue Sep 13 12:53:37 2016 -0700
Committer: Lei Xu <l...@apache.org>
Committed: Tue Sep 13 12:54:14 2016 -0700

----------------------------------------------------------------------
 .../server/datanode/BlockPoolSliceStorage.java  |  16 +-
 .../hdfs/server/datanode/BlockReceiver.java     |   2 +-
 .../hdfs/server/datanode/BlockSender.java       |   7 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |   2 +-
 .../server/datanode/DataNodeFaultInjector.java  |   2 +-
 .../hdfs/server/datanode/DataStorage.java       |   4 +-
 .../hdfs/server/datanode/DirectoryScanner.java  |  10 +-
 .../hdfs/server/datanode/FinalizedReplica.java  |  27 +-
 .../hdfs/server/datanode/LocalReplica.java      | 479 ++++++++++
 .../server/datanode/LocalReplicaInPipeline.java | 417 +++++++++
 .../server/datanode/ReplicaBeingWritten.java    |  16 +-
 .../hdfs/server/datanode/ReplicaBuilder.java    | 252 +++++
 .../hdfs/server/datanode/ReplicaHandler.java    |   6 +-
 .../hdfs/server/datanode/ReplicaInPipeline.java | 324 ++-----
 .../datanode/ReplicaInPipelineInterface.java    |  86 --
 .../hdfs/server/datanode/ReplicaInfo.java       | 370 ++++----
 .../server/datanode/ReplicaUnderRecovery.java   |  30 +-
 .../datanode/ReplicaWaitingToBeRecovered.java   |  27 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java |   9 +-
 .../datanode/fsdataset/impl/BlockPoolSlice.java |  74 +-
 .../impl/FsDatasetAsyncDiskService.java         |  71 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 908 +++++++------------
 .../datanode/fsdataset/impl/FsDatasetUtil.java  |  18 +
 .../datanode/fsdataset/impl/FsVolumeImpl.java   | 154 +++-
 .../datanode/fsdataset/impl/FsVolumeList.java   |   2 +-
 .../impl/RamDiskAsyncLazyPersistService.java    |  34 +-
 .../TestClientProtocolForPipelineRecovery.java  |   4 +-
 .../apache/hadoop/hdfs/TestCrcCorruption.java   |   6 +-
 .../server/datanode/SimulatedFSDataset.java     |  30 +-
 .../datanode/TestBlockPoolSliceStorage.java     |   6 +-
 .../hdfs/server/datanode/TestBlockRecovery.java |   2 +-
 .../datanode/TestDataNodeRollingUpgrade.java    |   6 +-
 .../server/datanode/TestDirectoryScanner.java   |  17 +-
 .../server/datanode/TestSimulatedFSDataset.java |   2 +-
 .../hdfs/server/datanode/TestTransferRbw.java   |   4 +-
 .../extdataset/ExternalDatasetImpl.java         |   6 +-
 .../extdataset/ExternalReplicaInPipeline.java   |  26 +-
 .../extdataset/TestExternalDataset.java         |   4 +-
 .../fsdataset/impl/FsDatasetImplTestUtils.java  |  43 +-
 .../fsdataset/impl/FsDatasetTestUtil.java       |  20 +-
 .../fsdataset/impl/TestWriteToReplica.java      |   4 +-
 41 files changed, 2219 insertions(+), 1308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index fd90ae9..fd89611 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -741,7 +742,20 @@ public class BlockPoolSliceStorage extends Storage {
    *
    * @return the trash directory for a given block file that is being deleted.
    */
-  public String getTrashDirectory(File blockFile) {
+  public String getTrashDirectory(ReplicaInfo info) {
+
+    URI blockURI = info.getBlockURI();
+    try{
+      File blockFile = new File(blockURI);
+      return getTrashDirectory(blockFile);
+    } catch (IllegalArgumentException e) {
+      LOG.warn("Failed to get block file for replica " + info, e);
+    }
+
+    return null;
+  }
+
+  private String getTrashDirectory(File blockFile) {
     if (isTrashAllowed(blockFile)) {
       Matcher matcher = 
BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent());
       String trashDirectory = matcher.replaceFirst("$1$2" + TRASH_ROOT_DIR + 
"$4");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 522d577..39419c1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -121,7 +121,7 @@ class BlockReceiver implements Closeable {
   /** the block to receive */
   private final ExtendedBlock block; 
   /** the replica to write */
-  private ReplicaInPipelineInterface replicaInfo;
+  private ReplicaInPipeline replicaInfo;
   /** pipeline stage */
   private final BlockConstructionStage stage;
   private final boolean isTransfer;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 9d9502b..c3ba2eb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -248,8 +249,8 @@ class BlockSender implements java.io.Closeable {
       }
       // if there is a write in progress
       ChunkChecksum chunkChecksum = null;
-      if (replica instanceof ReplicaBeingWritten) {
-        final ReplicaBeingWritten rbw = (ReplicaBeingWritten)replica;
+      if (replica.getState() == ReplicaState.RBW) {
+        final ReplicaInPipeline rbw = (ReplicaInPipeline) replica;
         waitForMinLength(rbw, startOffset + length);
         chunkChecksum = rbw.getLastChecksumAndDataLen();
       }
@@ -473,7 +474,7 @@ class BlockSender implements java.io.Closeable {
    * @param len minimum length to reach
    * @throws IOException on failing to reach the len in given wait time
    */
-  private static void waitForMinLength(ReplicaBeingWritten rbw, long len)
+  private static void waitForMinLength(ReplicaInPipeline rbw, long len)
       throws IOException {
     // Wait for 3 seconds for rbw replica to reach the minimum length
     for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; i++) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 0025040..09ecac1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -3474,4 +3474,4 @@ public class DataNode extends ReconfigurableBase
   void setBlockScanner(BlockScanner blockScanner) {
     this.blockScanner = blockScanner;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index 931c124..aa06aa1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -56,6 +56,6 @@ public class DataNodeFaultInjector {
 
   public void failMirrorConnection() throws IOException { }
 
-  public void failPipeline(ReplicaInPipelineInterface replicaInfo,
+  public void failPipeline(ReplicaInPipeline replicaInfo,
       String mirrorAddr) throws IOException { }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 0e6b339..7e620c2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -204,9 +204,9 @@ public class DataStorage extends Storage {
    * @return trash directory if rolling upgrade is in progress, null
    *         otherwise.
    */
-  public String getTrashDirectoryForBlockFile(String bpid, File blockFile) {
+  public String getTrashDirectoryForReplica(String bpid, ReplicaInfo info) {
     if (trashEnabledBpids.contains(bpid)) {
-      return getBPStorage(bpid).getTrashDirectory(blockFile);
+      return getBPStorage(bpid).getTrashDirectory(info);
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index f9ebab9..c50bfaf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -597,14 +597,14 @@ public class DirectoryScanner implements Runnable {
         diffs.put(bpid, diffRecord);
         
         statsRecord.totalBlocks = blockpoolReport.length;
-        List<FinalizedReplica> bl = dataset.getFinalizedBlocks(bpid);
-        FinalizedReplica[] memReport = bl.toArray(new 
FinalizedReplica[bl.size()]);
+        List<ReplicaInfo> bl = dataset.getFinalizedBlocks(bpid);
+        ReplicaInfo[] memReport = bl.toArray(new ReplicaInfo[bl.size()]);
         Arrays.sort(memReport); // Sort based on blockId
   
         int d = 0; // index for blockpoolReport
         int m = 0; // index for memReprot
         while (m < memReport.length && d < blockpoolReport.length) {
-          FinalizedReplica memBlock = memReport[m];
+          ReplicaInfo memBlock = memReport[m];
           ScanInfo info = blockpoolReport[d];
           if (info.getBlockId() < memBlock.getBlockId()) {
             if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
@@ -633,7 +633,7 @@ public class DirectoryScanner implements Runnable {
             // or block file length is different than expected
             statsRecord.mismatchBlocks++;
             addDifference(diffRecord, statsRecord, info);
-          } else if (info.getBlockFile().compareTo(memBlock.getBlockFile()) != 
0) {
+          } else if (memBlock.compareWith(info) != 0) {
             // volumeMap record and on-disk files don't match.
             statsRecord.duplicateBlocks++;
             addDifference(diffRecord, statsRecord, info);
@@ -652,7 +652,7 @@ public class DirectoryScanner implements Runnable {
           }
         }
         while (m < memReport.length) {
-          FinalizedReplica current = memReport[m++];
+          ReplicaInfo current = memReport[m++];
           addDifference(diffRecord, statsRecord,
                         current.getBlockId(), current.getVolume());
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
index 8daeb51..81a4ab4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
@@ -22,11 +22,12 @@ import java.io.File;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 
 /**
  * This class describes a replica that has been finalized.
  */
-public class FinalizedReplica extends ReplicaInfo {
+public class FinalizedReplica extends LocalReplica {
 
   /**
    * Constructor
@@ -88,4 +89,28 @@ public class FinalizedReplica extends ReplicaInfo {
   public String toString() {
     return super.toString();
   }
+
+  @Override
+  public ReplicaInfo getOriginalReplica() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getOriginalReplica");
+  }
+
+  @Override
+  public long getRecoveryID() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getRecoveryID");
+  }
+
+  @Override
+  public void setRecoveryID(long recoveryId) {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support setRecoveryID");
+  }
+
+  @Override
+  public ReplicaRecoveryInfo createInfo() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support createInfo");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
new file mode 100644
index 0000000..cbfc9a5
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
@@ -0,0 +1,479 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfo;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.DataChecksum;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class is used for all replicas which are on local storage media
+ * and hence, are backed by files.
+ */
+abstract public class LocalReplica extends ReplicaInfo {
+
+  /**
+   * Base directory containing numerically-identified sub directories and
+   * possibly blocks.
+   */
+  private File baseDir;
+
+  /**
+   * Whether or not this replica's parent directory includes subdirs, in which
+   * case we can generate them based on the replica's block ID
+   */
+  private boolean hasSubdirs;
+
+  private static final Map<String, File> internedBaseDirs = new 
HashMap<String, File>();
+
+  static final Log LOG = LogFactory.getLog(LocalReplica.class);
+  private final static boolean IS_NATIVE_IO_AVAIL;
+  static {
+    IS_NATIVE_IO_AVAIL = NativeIO.isAvailable();
+    if (Path.WINDOWS && !IS_NATIVE_IO_AVAIL) {
+      LOG.warn("Data node cannot fully support concurrent reading"
+          + " and writing without native code extensions on Windows.");
+    }
+  }
+
+  /**
+   * Constructor
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  LocalReplica(Block block, FsVolumeSpi vol, File dir) {
+    this(block.getBlockId(), block.getNumBytes(),
+        block.getGenerationStamp(), vol, dir);
+  }
+
+  /**
+   * Constructor
+   * @param blockId block id
+   * @param len replica length
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  LocalReplica(long blockId, long len, long genStamp,
+      FsVolumeSpi vol, File dir) {
+    super(vol, blockId, len, genStamp);
+    setDirInternal(dir);
+  }
+
+  /**
+   * Copy constructor.
+   * @param from the source replica
+   */
+  LocalReplica(LocalReplica from) {
+    this(from, from.getVolume(), from.getDir());
+  }
+
+  /**
+   * Get the full path of this replica's data file.
+   * @return the full path of this replica's data file
+   */
+  @VisibleForTesting
+  public File getBlockFile() {
+    return new File(getDir(), getBlockName());
+  }
+
+  /**
+   * Get the full path of this replica's meta file.
+   * @return the full path of this replica's meta file
+   */
+  @VisibleForTesting
+  public File getMetaFile() {
+    return new File(getDir(),
+        DatanodeUtil.getMetaName(getBlockName(), getGenerationStamp()));
+  }
+
+  /**
+   * Return the parent directory path where this replica is located.
+   * @return the parent directory path where this replica is located
+   */
+  protected File getDir() {
+    return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir,
+        getBlockId()) : baseDir;
+  }
+
+  /**
+   * Set the parent directory where this replica is located.
+   * @param dir the parent directory where the replica is located
+   */
+  private void setDirInternal(File dir) {
+    if (dir == null) {
+      baseDir = null;
+      return;
+    }
+
+    ReplicaDirInfo dirInfo = parseBaseDir(dir);
+    this.hasSubdirs = dirInfo.hasSubidrs;
+
+    synchronized (internedBaseDirs) {
+      if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) {
+        // Create a new String path of this file and make a brand new File 
object
+        // to guarantee we drop the reference to the underlying char[] storage.
+        File baseDir = new File(dirInfo.baseDirPath);
+        internedBaseDirs.put(dirInfo.baseDirPath, baseDir);
+      }
+      this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath);
+    }
+  }
+
+  @VisibleForTesting
+  public static class ReplicaDirInfo {
+    public String baseDirPath;
+    public boolean hasSubidrs;
+
+    public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) {
+      this.baseDirPath = baseDirPath;
+      this.hasSubidrs = hasSubidrs;
+    }
+  }
+
+  @VisibleForTesting
+  public static ReplicaDirInfo parseBaseDir(File dir) {
+
+    File currentDir = dir;
+    boolean hasSubdirs = false;
+    while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
+      hasSubdirs = true;
+      currentDir = currentDir.getParentFile();
+    }
+
+    return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs);
+  }
+
+  /**
+   * Copy specified file into a temporary file. Then rename the
+   * temporary file to the original name. This will cause any
+   * hardlinks to the original file to be removed. The temporary
+   * files are created in the same directory. The temporary files will
+   * be recovered (especially on Windows) on datanode restart.
+   */
+  private void breakHardlinks(File file, Block b) throws IOException {
+    File tmpFile = DatanodeUtil.createTmpFile(b, 
DatanodeUtil.getUnlinkTmpFile(file));
+    try (FileInputStream in = new FileInputStream(file)) {
+      try (FileOutputStream out = new FileOutputStream(tmpFile)){
+        IOUtils.copyBytes(in, out, 16 * 1024);
+      }
+      if (file.length() != tmpFile.length()) {
+        throw new IOException("Copy of file " + file + " size " + 
file.length()+
+                              " into file " + tmpFile +
+                              " resulted in a size of " + tmpFile.length());
+      }
+      FileUtil.replaceFile(tmpFile, file);
+    } catch (IOException e) {
+      boolean done = tmpFile.delete();
+      if (!done) {
+        DataNode.LOG.info("detachFile failed to delete temporary file " +
+                          tmpFile);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * This function "breaks hardlinks" to the current replica file.
+   *
+   * When doing a DataNode upgrade, we create a bunch of hardlinks to each 
block
+   * file.  This cleverly ensures that both the old and the new storage
+   * directories can contain the same block file, without using additional 
space
+   * for the data.
+   *
+   * However, when we want to append to the replica file, we need to "break" 
the
+   * hardlink to ensure that the old snapshot continues to contain the old data
+   * length.  If we failed to do that, we could roll back to the previous/
+   * directory during a downgrade, and find that the block contents were longer
+   * than they were at the time of upgrade.
+   *
+   * @return true only if data was copied.
+   * @throws IOException
+   */
+  public boolean breakHardLinksIfNeeded() throws IOException {
+    File file = getBlockFile();
+    if (file == null || getVolume() == null) {
+      throw new IOException("detachBlock:Block not found. " + this);
+    }
+    File meta = getMetaFile();
+
+    int linkCount = HardLink.getLinkCount(file);
+    if (linkCount > 1) {
+      DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
+          "block " + this);
+      breakHardlinks(file, this);
+    }
+    if (HardLink.getLinkCount(meta) > 1) {
+      breakHardlinks(meta, this);
+    }
+    return true;
+  }
+
+  @Override
+  public URI getBlockURI() {
+    return getBlockFile().toURI();
+  }
+
+  @Override
+  public InputStream getDataInputStream(long seekOffset) throws IOException {
+
+    File blockFile = getBlockFile();
+    if (IS_NATIVE_IO_AVAIL) {
+      return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
+    } else {
+      try {
+        return FsDatasetUtil.openAndSeek(blockFile, seekOffset);
+      } catch (FileNotFoundException fnfe) {
+        throw new IOException("Block " + this + " is not valid. " +
+            "Expected block file at " + blockFile + " does not exist.");
+      }
+    }
+  }
+
+  @Override
+  public OutputStream getDataOutputStream(boolean append) throws IOException {
+    return new FileOutputStream(getBlockFile(), append);
+  }
+
+  @Override
+  public boolean blockDataExists() {
+    return getBlockFile().exists();
+  }
+
+  @Override
+  public boolean deleteBlockData() {
+    return getBlockFile().delete();
+  }
+
+  @Override
+  public long getBlockDataLength() {
+    return getBlockFile().length();
+  }
+
+  @Override
+  public URI getMetadataURI() {
+    return getMetaFile().toURI();
+  }
+
+  @Override
+  public LengthInputStream getMetadataInputStream(long offset)
+      throws IOException {
+    File meta = getMetaFile();
+    return new LengthInputStream(
+        FsDatasetUtil.openAndSeek(meta, offset), meta.length());
+  }
+
+  @Override
+  public OutputStream getMetadataOutputStream(boolean append)
+      throws IOException {
+    return new FileOutputStream(getMetaFile(), append);
+  }
+
+  @Override
+  public boolean metadataExists() {
+    return getMetaFile().exists();
+  }
+
+  @Override
+  public boolean deleteMetadata() {
+    return getMetaFile().delete();
+  }
+
+  @Override
+  public long getMetadataLength() {
+    return getMetaFile().length();
+  }
+
+  @Override
+  public boolean renameMeta(URI destURI) throws IOException {
+    return renameFile(getMetaFile(), new File(destURI));
+  }
+
+  @Override
+  public boolean renameData(URI destURI) throws IOException {
+    return renameFile(getBlockFile(), new File(destURI));
+  }
+
+  private boolean renameFile(File srcfile, File destfile) throws IOException {
+    try {
+      NativeIO.renameTo(srcfile, destfile);
+      return true;
+    } catch (IOException e) {
+      throw new IOException("Failed to move block file for " + this
+          + " from " + srcfile + " to " + destfile.getAbsolutePath(), e);
+    }
+  }
+
+  @Override
+  public void updateWithReplica(StorageLocation replicaLocation) {
+    // for local replicas, the replica location is assumed to be a file.
+    File diskFile = replicaLocation.getFile();
+    if (null == diskFile) {
+      setDirInternal(null);
+    } else {
+      setDirInternal(diskFile.getParentFile());
+    }
+  }
+
+  @Override
+  public boolean getPinning(LocalFileSystem localFS) throws IOException {
+    FileStatus fss =
+        localFS.getFileStatus(new Path(getBlockFile().getAbsolutePath()));
+    return fss.getPermission().getStickyBit();
+  }
+
+  @Override
+  public void setPinning(LocalFileSystem localFS) throws IOException {
+    File f = getBlockFile();
+    Path p = new Path(f.getAbsolutePath());
+
+    FsPermission oldPermission = localFS.getFileStatus(
+        new Path(f.getAbsolutePath())).getPermission();
+    //sticky bit is used for pinning purpose
+    FsPermission permission = new FsPermission(oldPermission.getUserAction(),
+        oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
+    localFS.setPermission(p, permission);
+  }
+
+  @Override
+  public void bumpReplicaGS(long newGS) throws IOException {
+    long oldGS = getGenerationStamp();
+    File oldmeta = getMetaFile();
+    setGenerationStamp(newGS);
+    File newmeta = getMetaFile();
+
+    // rename meta file to new GS
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+    }
+    try {
+      // calling renameMeta on the ReplicaInfo doesn't work here
+      NativeIO.renameTo(oldmeta, newmeta);
+    } catch (IOException e) {
+      setGenerationStamp(oldGS); // restore old GS
+      throw new IOException("Block " + this + " reopen failed. " +
+                            " Unable to move meta file  " + oldmeta +
+                            " to " + newmeta, e);
+    }
+  }
+
+  @Override
+  public void truncateBlock(long newLength) throws IOException {
+    truncateBlock(getBlockFile(), getMetaFile(), getNumBytes(), newLength);
+  }
+
+  @Override
+  public int compareWith(ScanInfo info) {
+    return info.getBlockFile().compareTo(getBlockFile());
+  }
+
+  static public void truncateBlock(File blockFile, File metaFile,
+      long oldlen, long newlen) throws IOException {
+    LOG.info("truncateBlock: blockFile=" + blockFile
+        + ", metaFile=" + metaFile
+        + ", oldlen=" + oldlen
+        + ", newlen=" + newlen);
+
+    if (newlen == oldlen) {
+      return;
+    }
+    if (newlen > oldlen) {
+      throw new IOException("Cannot truncate block to from oldlen (=" + oldlen
+          + ") to newlen (=" + newlen + ")");
+    }
+
+    DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
+    int checksumsize = dcs.getChecksumSize();
+    int bpc = dcs.getBytesPerChecksum();
+    long n = (newlen - 1)/bpc + 1;
+    long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;
+    long lastchunkoffset = (n - 1)*bpc;
+    int lastchunksize = (int)(newlen - lastchunkoffset);
+    byte[] b = new byte[Math.max(lastchunksize, checksumsize)];
+
+    RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
+    try {
+      //truncate blockFile
+      blockRAF.setLength(newlen);
+
+      //read last chunk
+      blockRAF.seek(lastchunkoffset);
+      blockRAF.readFully(b, 0, lastchunksize);
+    } finally {
+      blockRAF.close();
+    }
+
+    //compute checksum
+    dcs.update(b, 0, lastchunksize);
+    dcs.writeValue(b, 0, false);
+
+    //update metaFile
+    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+    try {
+      metaRAF.setLength(newmetalen);
+      metaRAF.seek(newmetalen - checksumsize);
+      metaRAF.write(b, 0, checksumsize);
+    } finally {
+      metaRAF.close();
+    }
+  }
+
+  @Override
+  public void copyMetadata(URI destination) throws IOException {
+    //for local replicas, we assume the destination URI is file
+    Storage.nativeCopyFileUnbuffered(getMetaFile(),
+        new File(destination), true);
+  }
+
+  @Override
+  public void copyBlockdata(URI destination) throws IOException {
+    //for local replicas, we assume the destination URI is file
+    Storage.nativeCopyFileUnbuffered(getBlockFile(),
+        new File(destination), true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
new file mode 100644
index 0000000..bc7bc6d
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This class defines a replica in a pipeline, which
+ * includes a persistent replica being written to by a dfs client or
+ * a temporary replica being replicated by a source datanode or
+ * being copied for the balancing purpose.
+ *
+ * The base class implements a temporary replica
+ */
+public class LocalReplicaInPipeline extends LocalReplica
+                        implements ReplicaInPipeline {
+  private long bytesAcked;
+  private long bytesOnDisk;
+  private byte[] lastChecksum;
+  private AtomicReference<Thread> writer = new AtomicReference<Thread>();
+
+  /**
+   * Bytes reserved for this replica on the containing volume.
+   * Based off difference between the estimated maximum block length and
+   * the bytes already written to this block.
+   */
+  private long bytesReserved;
+  private final long originalBytesReserved;
+
+  /**
+   * Constructor for a zero length replica.
+   * @param blockId block id
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   * @param bytesToReserve disk space to reserve for this replica, based on
+   *                       the estimated maximum block length.
+   */
+  public LocalReplicaInPipeline(long blockId, long genStamp,
+        FsVolumeSpi vol, File dir, long bytesToReserve) {
+    this(blockId, 0L, genStamp, vol, dir, Thread.currentThread(),
+        bytesToReserve);
+  }
+
+  /**
+   * Constructor
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   * @param writer a thread that is writing to this replica
+   */
+  LocalReplicaInPipeline(Block block,
+      FsVolumeSpi vol, File dir, Thread writer) {
+    this(block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
+        vol, dir, writer, 0L);
+  }
+
+  /**
+   * Constructor
+   * @param blockId block id
+   * @param len replica length
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   * @param writer a thread that is writing to this replica
+   * @param bytesToReserve disk space to reserve for this replica, based on
+   *                       the estimated maximum block length.
+   */
+  LocalReplicaInPipeline(long blockId, long len, long genStamp,
+      FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) {
+    super(blockId, len, genStamp, vol, dir);
+    this.bytesAcked = len;
+    this.bytesOnDisk = len;
+    this.writer.set(writer);
+    this.bytesReserved = bytesToReserve;
+    this.originalBytesReserved = bytesToReserve;
+  }
+
+  /**
+   * Copy constructor.
+   * @param from where to copy from
+   */
+  public LocalReplicaInPipeline(LocalReplicaInPipeline from) {
+    super(from);
+    this.bytesAcked = from.getBytesAcked();
+    this.bytesOnDisk = from.getBytesOnDisk();
+    this.writer.set(from.writer.get());
+    this.bytesReserved = from.bytesReserved;
+    this.originalBytesReserved = from.originalBytesReserved;
+  }
+
+  @Override
+  public long getVisibleLength() {
+    return -1;
+  }
+
+  @Override  //ReplicaInfo
+  public ReplicaState getState() {
+    return ReplicaState.TEMPORARY;
+  }
+
+  @Override // ReplicaInPipeline
+  public long getBytesAcked() {
+    return bytesAcked;
+  }
+
+  @Override // ReplicaInPipeline
+  public void setBytesAcked(long bytesAcked) {
+    long newBytesAcked = bytesAcked - this.bytesAcked;
+    this.bytesAcked = bytesAcked;
+
+    // Once bytes are ACK'ed we can release equivalent space from the
+    // volume's reservedForRbw count. We could have released it as soon
+    // as the write-to-disk completed but that would be inefficient.
+    getVolume().releaseReservedSpace(newBytesAcked);
+    bytesReserved -= newBytesAcked;
+  }
+
+  @Override // ReplicaInPipeline
+  public long getBytesOnDisk() {
+    return bytesOnDisk;
+  }
+
+  @Override
+  public long getBytesReserved() {
+    return bytesReserved;
+  }
+
+  @Override
+  public long getOriginalBytesReserved() {
+    return originalBytesReserved;
+  }
+
+  @Override // ReplicaInPipeline
+  public void releaseAllBytesReserved() {
+    getVolume().releaseReservedSpace(bytesReserved);
+    getVolume().releaseLockedMemory(bytesReserved);
+    bytesReserved = 0;
+  }
+
+  @Override // ReplicaInPipeline
+  public synchronized void setLastChecksumAndDataLen(long dataLength,
+      byte[] checksum) {
+    this.bytesOnDisk = dataLength;
+    this.lastChecksum = checksum;
+  }
+
+  @Override // ReplicaInPipeline
+  public synchronized ChunkChecksum getLastChecksumAndDataLen() {
+    return new ChunkChecksum(getBytesOnDisk(), lastChecksum);
+  }
+
+  @Override // ReplicaInPipeline
+  public void setWriter(Thread writer) {
+    this.writer.set(writer);
+  }
+
+  @Override
+  public void interruptThread() {
+    Thread thread = writer.get();
+    if (thread != null && thread != Thread.currentThread()
+        && thread.isAlive()) {
+      thread.interrupt();
+    }
+  }
+
+  @Override  // Object
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+
+  /**
+   * Attempt to set the writer to a new value.
+   */
+  @Override // ReplicaInPipeline
+  public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) {
+    return writer.compareAndSet(prevWriter, newWriter);
+  }
+
+  /**
+   * Interrupt the writing thread and wait until it dies.
+   * @throws IOException the waiting is interrupted
+   */
+  @Override // ReplicaInPipeline
+  public void stopWriter(long xceiverStopTimeout) throws IOException {
+    while (true) {
+      Thread thread = writer.get();
+      if ((thread == null) || (thread == Thread.currentThread()) ||
+          (!thread.isAlive())) {
+        if (writer.compareAndSet(thread, null)) {
+          return; // Done
+        }
+        // The writer changed.  Go back to the start of the loop and attempt to
+        // stop the new writer.
+        continue;
+      }
+      thread.interrupt();
+      try {
+        thread.join(xceiverStopTimeout);
+        if (thread.isAlive()) {
+          // Our thread join timed out.
+          final String msg = "Join on writer thread " + thread + " timed out";
+          DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(thread));
+          throw new IOException(msg);
+        }
+      } catch (InterruptedException e) {
+        throw new IOException("Waiting for writer thread is interrupted.");
+      }
+    }
+  }
+
+  @Override  // Object
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  @Override // ReplicaInPipeline
+  public ReplicaOutputStreams createStreams(boolean isCreate,
+      DataChecksum requestedChecksum) throws IOException {
+    File blockFile = getBlockFile();
+    File metaFile = getMetaFile();
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("writeTo blockfile is " + blockFile +
+                         " of size " + blockFile.length());
+      DataNode.LOG.debug("writeTo metafile is " + metaFile +
+                         " of size " + metaFile.length());
+    }
+    long blockDiskSize = 0L;
+    long crcDiskSize = 0L;
+
+    // the checksum that should actually be used -- this
+    // may differ from requestedChecksum for appends.
+    final DataChecksum checksum;
+
+    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+
+    if (!isCreate) {
+      // For append or recovery, we must enforce the existing checksum.
+      // Also, verify that the file has correct lengths, etc.
+      boolean checkedMeta = false;
+      try {
+        BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
+        checksum = header.getChecksum();
+
+        if (checksum.getBytesPerChecksum() !=
+            requestedChecksum.getBytesPerChecksum()) {
+          throw new IOException("Client requested checksum " +
+              requestedChecksum + " when appending to an existing block " +
+              "with different chunk size: " + checksum);
+        }
+
+        int bytesPerChunk = checksum.getBytesPerChecksum();
+        int checksumSize = checksum.getChecksumSize();
+
+        blockDiskSize = bytesOnDisk;
+        crcDiskSize = BlockMetadataHeader.getHeaderSize() +
+          (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
+        if (blockDiskSize > 0 &&
+            (blockDiskSize > blockFile.length() ||
+               crcDiskSize>metaFile.length())) {
+          throw new IOException("Corrupted block: " + this);
+        }
+        checkedMeta = true;
+      } finally {
+        if (!checkedMeta) {
+          // clean up in case of exceptions.
+          IOUtils.closeStream(metaRAF);
+        }
+      }
+    } else {
+      // for create, we can use the requested checksum
+      checksum = requestedChecksum;
+    }
+
+    FileOutputStream blockOut = null;
+    FileOutputStream crcOut = null;
+    try {
+      blockOut = new FileOutputStream(
+          new RandomAccessFile(blockFile, "rw").getFD());
+      crcOut = new FileOutputStream(metaRAF.getFD());
+      if (!isCreate) {
+        blockOut.getChannel().position(blockDiskSize);
+        crcOut.getChannel().position(crcDiskSize);
+      }
+      return new ReplicaOutputStreams(blockOut, crcOut, checksum,
+          getVolume().isTransientStorage());
+    } catch (IOException e) {
+      IOUtils.closeStream(blockOut);
+      IOUtils.closeStream(metaRAF);
+      throw e;
+    }
+  }
+
+  @Override
+  public OutputStream createRestartMetaStream() throws IOException {
+    File blockFile = getBlockFile();
+    File restartMeta = new File(blockFile.getParent()  +
+        File.pathSeparator + "." + blockFile.getName() + ".restart");
+    if (restartMeta.exists() && !restartMeta.delete()) {
+      DataNode.LOG.warn("Failed to delete restart meta file: " +
+          restartMeta.getPath());
+    }
+    return new FileOutputStream(restartMeta);
+  }
+
+  @Override
+  public String toString() {
+    return super.toString()
+        + "\n  bytesAcked=" + bytesAcked
+        + "\n  bytesOnDisk=" + bytesOnDisk;
+  }
+
+  @Override
+  public ReplicaInfo getOriginalReplica() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getOriginalReplica");
+  }
+
+  @Override
+  public long getRecoveryID() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getRecoveryID");
+  }
+
+  @Override
+  public void setRecoveryID(long recoveryId) {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support setRecoveryID");
+  }
+
+  @Override
+  public ReplicaRecoveryInfo createInfo(){
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support createInfo");
+  }
+
+  public void moveReplicaFrom(ReplicaInfo oldReplicaInfo, File newBlkFile)
+      throws IOException {
+
+    if (!(oldReplicaInfo instanceof LocalReplica)) {
+      throw new IOException("The source replica with blk id "
+          + oldReplicaInfo.getBlockId()
+          + " should be derived from LocalReplica");
+    }
+
+    LocalReplica localReplica = (LocalReplica) oldReplicaInfo;
+
+    File oldmeta = localReplica.getMetaFile();
+    File newmeta = getMetaFile();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+    }
+    try {
+      NativeIO.renameTo(oldmeta, newmeta);
+    } catch (IOException e) {
+      throw new IOException("Block " + oldReplicaInfo + " reopen failed. " +
+                            " Unable to move meta file  " + oldmeta +
+                            " to rbw dir " + newmeta, e);
+    }
+
+    File blkfile = localReplica.getBlockFile();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming " + blkfile + " to " + newBlkFile
+          + ", file length=" + blkfile.length());
+    }
+    try {
+      NativeIO.renameTo(blkfile, newBlkFile);
+    } catch (IOException e) {
+      try {
+        NativeIO.renameTo(newmeta, oldmeta);
+      } catch (IOException ex) {
+        LOG.warn("Cannot move meta file " + newmeta +
+            "back to the finalized directory " + oldmeta, ex);
+      }
+      throw new IOException("Block " + oldReplicaInfo + " reopen failed. " +
+                              " Unable to move block file " + blkfile +
+                              " to rbw dir " + newBlkFile, e);
+    }
+  }
+
+  @Override // ReplicaInPipeline
+  public ReplicaInfo getReplicaInfo() {
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
index 4a89493..262533e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
@@ -27,9 +27,9 @@ import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
  * Those are the replicas that
  * are created in a pipeline initiated by a dfs client.
  */
-public class ReplicaBeingWritten extends ReplicaInPipeline {
+public class ReplicaBeingWritten extends LocalReplicaInPipeline {
   /**
-   * Constructor for a zero length replica
+   * Constructor for a zero length replica.
    * @param blockId block id
    * @param genStamp replica generation stamp
    * @param vol volume where replica is located
@@ -37,25 +37,25 @@ public class ReplicaBeingWritten extends ReplicaInPipeline {
    * @param bytesToReserve disk space to reserve for this replica, based on
    *                       the estimated maximum block length.
    */
-  public ReplicaBeingWritten(long blockId, long genStamp, 
+  public ReplicaBeingWritten(long blockId, long genStamp,
         FsVolumeSpi vol, File dir, long bytesToReserve) {
     super(blockId, genStamp, vol, dir, bytesToReserve);
   }
-  
+
   /**
-   * Constructor
+   * Constructor.
    * @param block a block
    * @param vol volume where replica is located
    * @param dir directory path where block and meta files are located
    * @param writer a thread that is writing to this replica
    */
-  public ReplicaBeingWritten(Block block, 
+  public ReplicaBeingWritten(Block block,
       FsVolumeSpi vol, File dir, Thread writer) {
-    super( block, vol, dir, writer);
+    super(block, vol, dir, writer);
   }
 
   /**
-   * Constructor
+   * Constructor.
    * @param blockId block id
    * @param len replica length
    * @param genStamp replica generation stamp

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
new file mode 100644
index 0000000..280aaa0
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+
+/**
+ * This class is to be used as a builder for {@link ReplicaInfo} objects.
+ * The state of the replica is used to determine which object is instantiated.
+ */
+public class ReplicaBuilder {
+
+  private ReplicaState state;
+  private long blockId;
+  private long genStamp;
+  private long length;
+  private FsVolumeSpi volume;
+  private File directoryUsed;
+  private long bytesToReserve;
+  private Thread writer;
+  private long recoveryId;
+  private Block block;
+
+  private ReplicaInfo fromReplica;
+
+  public ReplicaBuilder(ReplicaState state) {
+    volume = null;
+    writer = null;
+    block = null;
+    length = -1;
+    this.state = state;
+  }
+
+  public ReplicaBuilder setState(ReplicaState state) {
+    this.state = state;
+    return this;
+  }
+
+  public ReplicaBuilder setBlockId(long blockId) {
+    this.blockId = blockId;
+    return this;
+  }
+
+  public ReplicaBuilder setGenerationStamp(long genStamp) {
+    this.genStamp = genStamp;
+    return this;
+  }
+
+  public ReplicaBuilder setLength(long length) {
+    this.length = length;
+    return this;
+  }
+
+  public ReplicaBuilder setFsVolume(FsVolumeSpi volume) {
+    this.volume = volume;
+    return this;
+  }
+
+  public ReplicaBuilder setDirectoryToUse(File dir) {
+    this.directoryUsed = dir;
+    return this;
+  }
+
+  public ReplicaBuilder setBytesToReserve(long bytesToReserve) {
+    this.bytesToReserve = bytesToReserve;
+    return this;
+  }
+
+  public ReplicaBuilder setWriterThread(Thread writer) {
+    this.writer = writer;
+    return this;
+  }
+
+  public ReplicaBuilder from(ReplicaInfo fromReplica) {
+    this.fromReplica = fromReplica;
+    return this;
+  }
+
+  public ReplicaBuilder setRecoveryId(long recoveryId) {
+    this.recoveryId = recoveryId;
+    return this;
+  }
+
+  public ReplicaBuilder setBlock(Block block) {
+    this.block = block;
+    return this;
+  }
+
+  public LocalReplicaInPipeline buildLocalReplicaInPipeline()
+      throws IllegalArgumentException {
+    LocalReplicaInPipeline info = null;
+    switch(state) {
+    case RBW:
+      info = buildRBW();
+      break;
+    case TEMPORARY:
+      info = buildTemporaryReplica();
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown replica state " + state);
+    }
+    return info;
+  }
+
+  private LocalReplicaInPipeline buildRBW() throws IllegalArgumentException {
+    if (null != fromReplica && fromReplica.getState() == ReplicaState.RBW) {
+      return new ReplicaBeingWritten((ReplicaBeingWritten) fromReplica);
+    } else if (null != fromReplica) {
+      throw new IllegalArgumentException("Incompatible fromReplica "
+          + "state: " + fromReplica.getState());
+    } else {
+      if (null != block) {
+        if (null == writer) {
+          throw new IllegalArgumentException("A valid writer is "
+              + "required for constructing a RBW from block "
+              + block.getBlockId());
+        }
+        return new ReplicaBeingWritten(block, volume, directoryUsed, writer);
+      } else {
+        if (length != -1) {
+          return new ReplicaBeingWritten(blockId, length, genStamp,
+              volume, directoryUsed, writer, bytesToReserve);
+        } else {
+          return new ReplicaBeingWritten(blockId, genStamp, volume,
+              directoryUsed, bytesToReserve);
+        }
+      }
+    }
+  }
+
+  private LocalReplicaInPipeline buildTemporaryReplica()
+      throws IllegalArgumentException {
+    if (null != fromReplica &&
+        fromReplica.getState() == ReplicaState.TEMPORARY) {
+      return new LocalReplicaInPipeline((LocalReplicaInPipeline) fromReplica);
+    } else if (null != fromReplica) {
+      throw new IllegalArgumentException("Incompatible fromReplica "
+          + "state: " + fromReplica.getState());
+    } else {
+      if (null != block) {
+        if (null == writer) {
+          throw new IllegalArgumentException("A valid writer is "
+              + "required for constructing a Replica from block "
+              + block.getBlockId());
+        }
+        return new LocalReplicaInPipeline(block, volume, directoryUsed,
+            writer);
+      } else {
+        if (length != -1) {
+          return new LocalReplicaInPipeline(blockId, length, genStamp,
+              volume, directoryUsed, writer, bytesToReserve);
+        } else {
+          return new LocalReplicaInPipeline(blockId, genStamp, volume,
+              directoryUsed, bytesToReserve);
+        }
+      }
+    }
+  }
+
+  private ReplicaInfo buildFinalizedReplica() throws IllegalArgumentException {
+    if (null != fromReplica &&
+        fromReplica.getState() == ReplicaState.FINALIZED) {
+      return new FinalizedReplica((FinalizedReplica)fromReplica);
+    } else if (null != this.fromReplica) {
+      throw new IllegalArgumentException("Incompatible fromReplica "
+          + "state: " + fromReplica.getState());
+    } else {
+      if (null != block) {
+        return new FinalizedReplica(block, volume, directoryUsed);
+      } else {
+        return new FinalizedReplica(blockId, length, genStamp, volume,
+            directoryUsed);
+      }
+    }
+  }
+
+  private ReplicaInfo buildRWR() throws IllegalArgumentException {
+
+    if (null != fromReplica && fromReplica.getState() == ReplicaState.RWR) {
+      return new ReplicaWaitingToBeRecovered(
+          (ReplicaWaitingToBeRecovered) fromReplica);
+    } else if (null != fromReplica){
+      throw new IllegalArgumentException("Incompatible fromReplica "
+          + "state: " + fromReplica.getState());
+    } else {
+      if (null != block) {
+        return new ReplicaWaitingToBeRecovered(block, volume, directoryUsed);
+      } else {
+        return new ReplicaWaitingToBeRecovered(blockId, length, genStamp,
+            volume, directoryUsed);
+      }
+    }
+  }
+
+  private ReplicaInfo buildRUR() throws IllegalArgumentException {
+    if (null == fromReplica) {
+      throw new IllegalArgumentException(
+          "Missing a valid replica to recover from");
+    }
+    if (null != writer || null != block) {
+      throw new IllegalArgumentException("Invalid state for "
+          + "recovering from replica with blk id "
+          + fromReplica.getBlockId());
+    }
+    if (fromReplica.getState() == ReplicaState.RUR) {
+      return new ReplicaUnderRecovery((ReplicaUnderRecovery) fromReplica);
+    } else {
+      return new ReplicaUnderRecovery(fromReplica, recoveryId);
+    }
+  }
+
+  public ReplicaInfo build() throws IllegalArgumentException {
+    ReplicaInfo info = null;
+    switch(this.state) {
+    case FINALIZED:
+      info = buildFinalizedReplica();
+      break;
+    case RWR:
+      info = buildRWR();
+      break;
+    case RUR:
+      info = buildRUR();
+      break;
+    case RBW:
+    case TEMPORARY:
+      info = buildLocalReplicaInPipeline();
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown replica state " + state);
+    }
+    return info;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java
index b563d7f..ddc9f9f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java
@@ -27,11 +27,11 @@ import java.io.IOException;
  * the fs volume where this replica is located.
  */
 public class ReplicaHandler implements Closeable {
-  private final ReplicaInPipelineInterface replica;
+  private final ReplicaInPipeline replica;
   private final FsVolumeReference volumeReference;
 
   public ReplicaHandler(
-      ReplicaInPipelineInterface replica, FsVolumeReference reference) {
+      ReplicaInPipeline replica, FsVolumeReference reference) {
     this.replica = replica;
     this.volumeReference = reference;
   }
@@ -43,7 +43,7 @@ public class ReplicaHandler implements Closeable {
     }
   }
 
-  public ReplicaInPipelineInterface getReplica() {
+  public ReplicaInPipeline getReplica() {
     return replica;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index 7326846..efa6ea6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -17,313 +17,91 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.RandomAccessFile;
-import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.StringUtils;
 
 /** 
- * This class defines a replica in a pipeline, which
- * includes a persistent replica being written to by a dfs client or
- * a temporary replica being replicated by a source datanode or
- * being copied for the balancing purpose.
- * 
- * The base class implements a temporary replica
+ * This defines the interface of a replica in Pipeline that's being written to
  */
-public class ReplicaInPipeline extends ReplicaInfo
-                        implements ReplicaInPipelineInterface {
-  private long bytesAcked;
-  private long bytesOnDisk;
-  private byte[] lastChecksum;  
-  private AtomicReference<Thread> writer = new AtomicReference<Thread>();
-
+public interface ReplicaInPipeline extends Replica {
   /**
-   * Bytes reserved for this replica on the containing volume.
-   * Based off difference between the estimated maximum block length and
-   * the bytes already written to this block.
+   * Set the number of bytes received
+   * @param bytesReceived number of bytes received
    */
-  private long bytesReserved;
-  private final long originalBytesReserved;
+  void setNumBytes(long bytesReceived);
 
   /**
-   * Constructor for a zero length replica
-   * @param blockId block id
-   * @param genStamp replica generation stamp
-   * @param vol volume where replica is located
-   * @param dir directory path where block and meta files are located
-   * @param bytesToReserve disk space to reserve for this replica, based on
-   *                       the estimated maximum block length.
+   * Get the number of bytes acked
+   * @return the number of bytes acked
    */
-  public ReplicaInPipeline(long blockId, long genStamp, 
-        FsVolumeSpi vol, File dir, long bytesToReserve) {
-    this(blockId, 0L, genStamp, vol, dir, Thread.currentThread(), 
bytesToReserve);
-  }
+  long getBytesAcked();
 
   /**
-   * Constructor
-   * @param block a block
-   * @param vol volume where replica is located
-   * @param dir directory path where block and meta files are located
-   * @param writer a thread that is writing to this replica
+   * Set the number bytes that have acked
+   * @param bytesAcked number bytes acked
    */
-  ReplicaInPipeline(Block block, 
-      FsVolumeSpi vol, File dir, Thread writer) {
-    this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
-        vol, dir, writer, 0L);
-  }
+  void setBytesAcked(long bytesAcked);
 
   /**
-   * Constructor
-   * @param blockId block id
-   * @param len replica length
-   * @param genStamp replica generation stamp
-   * @param vol volume where replica is located
-   * @param dir directory path where block and meta files are located
-   * @param writer a thread that is writing to this replica
-   * @param bytesToReserve disk space to reserve for this replica, based on
-   *                       the estimated maximum block length.
+   * Release any disk space reserved for this replica.
    */
-  ReplicaInPipeline(long blockId, long len, long genStamp,
-      FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) {
-    super( blockId, len, genStamp, vol, dir);
-    this.bytesAcked = len;
-    this.bytesOnDisk = len;
-    this.writer.set(writer);
-    this.bytesReserved = bytesToReserve;
-    this.originalBytesReserved = bytesToReserve;
-  }
+  public void releaseAllBytesReserved();
 
   /**
-   * Copy constructor.
-   * @param from where to copy from
+   * store the checksum for the last chunk along with the data length
+   * @param dataLength number of bytes on disk
+   * @param lastChecksum - checksum bytes for the last chunk
    */
-  public ReplicaInPipeline(ReplicaInPipeline from) {
-    super(from);
-    this.bytesAcked = from.getBytesAcked();
-    this.bytesOnDisk = from.getBytesOnDisk();
-    this.writer.set(from.writer.get());
-    this.bytesReserved = from.bytesReserved;
-    this.originalBytesReserved = from.originalBytesReserved;
-  }
-
-  @Override
-  public long getVisibleLength() {
-    return -1;
-  }
-  
-  @Override  //ReplicaInfo
-  public ReplicaState getState() {
-    return ReplicaState.TEMPORARY;
-  }
+  public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum);
   
-  @Override // ReplicaInPipelineInterface
-  public long getBytesAcked() {
-    return bytesAcked;
-  }
+  /**
+   * gets the last chunk checksum and the length of the block corresponding
+   * to that checksum
+   */
+  public ChunkChecksum getLastChecksumAndDataLen();
   
-  @Override // ReplicaInPipelineInterface
-  public void setBytesAcked(long bytesAcked) {
-    long newBytesAcked = bytesAcked - this.bytesAcked;
-    this.bytesAcked = bytesAcked;
+  /**
+   * Create output streams for writing to this replica,
+   * one for block file and one for CRC file
+   *
+   * @param isCreate if it is for creation
+   * @param requestedChecksum the checksum the writer would prefer to use
+   * @return output streams for writing
+   * @throws IOException if any error occurs
+   */
+  public ReplicaOutputStreams createStreams(boolean isCreate,
+      DataChecksum requestedChecksum) throws IOException;
 
-    // Once bytes are ACK'ed we can release equivalent space from the
-    // volume's reservedForRbw count. We could have released it as soon
-    // as the write-to-disk completed but that would be inefficient.
-    getVolume().releaseReservedSpace(newBytesAcked);
-    bytesReserved -= newBytesAcked;
-  }
+  /**
+   * Create an output stream to write restart metadata in case of datanode
+   * shutting down for quick restart.
+   *
+   * @return output stream for writing.
+   * @throws IOException if any error occurs
+   */
+  public OutputStream createRestartMetaStream() throws IOException;
   
-  @Override // ReplicaInPipelineInterface
-  public long getBytesOnDisk() {
-    return bytesOnDisk;
-  }
-
-  @Override
-  public long getBytesReserved() {
-    return bytesReserved;
-  }
+  ReplicaInfo getReplicaInfo();
   
-  @Override
-  public long getOriginalBytesReserved() {
-    return originalBytesReserved;
-  }
-
-  @Override
-  public void releaseAllBytesReserved() {  // ReplicaInPipelineInterface
-    getVolume().releaseReservedSpace(bytesReserved);
-    getVolume().releaseLockedMemory(bytesReserved);
-    bytesReserved = 0;
-  }
-
-  @Override // ReplicaInPipelineInterface
-  public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] 
lastChecksum) {
-    this.bytesOnDisk = dataLength;
-    this.lastChecksum = lastChecksum;
-  }
+  /**
+   * Set the thread that is writing to this replica
+   * @param writer a thread writing to this replica
+   */
+  void setWriter(Thread writer);
   
-  @Override // ReplicaInPipelineInterface
-  public synchronized ChunkChecksum getLastChecksumAndDataLen() {
-    return new ChunkChecksum(getBytesOnDisk(), lastChecksum);
-  }
-
-  public void interruptThread() {
-    Thread thread = writer.get();
-    if (thread != null && thread != Thread.currentThread() 
-        && thread.isAlive()) {
-      thread.interrupt();
-    }
-  }
-
-  @Override  // Object
-  public boolean equals(Object o) {
-    return super.equals(o);
-  }
+  void interruptThread();
   
   /**
    * Attempt to set the writer to a new value.
    */
-  public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) {
-    return writer.compareAndSet(prevWriter, newWriter);
-  }
+  boolean attemptToSetWriter(Thread prevWriter, Thread newWriter);
 
   /**
-   * Interrupt the writing thread and wait until it dies
+   * Interrupt the writing thread and wait until it dies.
    * @throws IOException the waiting is interrupted
    */
-  public void stopWriter(long xceiverStopTimeout) throws IOException {
-    while (true) {
-      Thread thread = writer.get();
-      if ((thread == null) || (thread == Thread.currentThread()) ||
-          (!thread.isAlive())) {
-        if (writer.compareAndSet(thread, null) == true) {
-          return; // Done
-        }
-        // The writer changed.  Go back to the start of the loop and attempt to
-        // stop the new writer.
-        continue;
-      }
-      thread.interrupt();
-      try {
-        thread.join(xceiverStopTimeout);
-        if (thread.isAlive()) {
-          // Our thread join timed out.
-          final String msg = "Join on writer thread " + thread + " timed out";
-          DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(thread));
-          throw new IOException(msg);
-        }
-      } catch (InterruptedException e) {
-        throw new IOException("Waiting for writer thread is interrupted.");
-      }
-    }
-  }
-
-  @Override  // Object
-  public int hashCode() {
-    return super.hashCode();
-  }
-  
-  @Override // ReplicaInPipelineInterface
-  public ReplicaOutputStreams createStreams(boolean isCreate, 
-      DataChecksum requestedChecksum) throws IOException {
-    File blockFile = getBlockFile();
-    File metaFile = getMetaFile();
-    if (DataNode.LOG.isDebugEnabled()) {
-      DataNode.LOG.debug("writeTo blockfile is " + blockFile +
-                         " of size " + blockFile.length());
-      DataNode.LOG.debug("writeTo metafile is " + metaFile +
-                         " of size " + metaFile.length());
-    }
-    long blockDiskSize = 0L;
-    long crcDiskSize = 0L;
-    
-    // the checksum that should actually be used -- this
-    // may differ from requestedChecksum for appends.
-    final DataChecksum checksum;
-    
-    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
-    
-    if (!isCreate) {
-      // For append or recovery, we must enforce the existing checksum.
-      // Also, verify that the file has correct lengths, etc.
-      boolean checkedMeta = false;
-      try {
-        BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
-        checksum = header.getChecksum();
-        
-        if (checksum.getBytesPerChecksum() !=
-            requestedChecksum.getBytesPerChecksum()) {
-          throw new IOException("Client requested checksum " +
-              requestedChecksum + " when appending to an existing block " +
-              "with different chunk size: " + checksum);
-        }
-        
-        int bytesPerChunk = checksum.getBytesPerChecksum();
-        int checksumSize = checksum.getChecksumSize();
-        
-        blockDiskSize = bytesOnDisk;
-        crcDiskSize = BlockMetadataHeader.getHeaderSize() +
-          (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
-        if (blockDiskSize>0 && 
-            (blockDiskSize>blockFile.length() || 
crcDiskSize>metaFile.length())) {
-          throw new IOException("Corrupted block: " + this);
-        }
-        checkedMeta = true;
-      } finally {
-        if (!checkedMeta) {
-          // clean up in case of exceptions.
-          IOUtils.closeStream(metaRAF);
-        }
-      }
-    } else {
-      // for create, we can use the requested checksum
-      checksum = requestedChecksum;
-    }
-    
-    FileOutputStream blockOut = null;
-    FileOutputStream crcOut = null;
-    try {
-      blockOut = new FileOutputStream(
-          new RandomAccessFile( blockFile, "rw" ).getFD() );
-      crcOut = new FileOutputStream(metaRAF.getFD() );
-      if (!isCreate) {
-        blockOut.getChannel().position(blockDiskSize);
-        crcOut.getChannel().position(crcDiskSize);
-      }
-      return new ReplicaOutputStreams(blockOut, crcOut, checksum,
-          getVolume().isTransientStorage());
-    } catch (IOException e) {
-      IOUtils.closeStream(blockOut);
-      IOUtils.closeStream(metaRAF);
-      throw e;
-    }
-  }
-
-  @Override
-  public OutputStream createRestartMetaStream() throws IOException {
-    File blockFile = getBlockFile();
-    File restartMeta = new File(blockFile.getParent()  +
-        File.pathSeparator + "." + blockFile.getName() + ".restart");
-    if (restartMeta.exists() && !restartMeta.delete()) {
-      DataNode.LOG.warn("Failed to delete restart meta file: " +
-          restartMeta.getPath());
-    }
-    return new FileOutputStream(restartMeta);
-  }
-
-  @Override
-  public String toString() {
-    return super.toString()
-        + "\n  bytesAcked=" + bytesAcked
-        + "\n  bytesOnDisk=" + bytesOnDisk;
-  }
+  void stopWriter(long xceiverStopTimeout) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
deleted file mode 100644
index ef9f3e2..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.util.DataChecksum;
-
-/** 
- * This defines the interface of a replica in Pipeline that's being written to
- */
-public interface ReplicaInPipelineInterface extends Replica {
-  /**
-   * Set the number of bytes received
-   * @param bytesReceived number of bytes received
-   */
-  void setNumBytes(long bytesReceived);
-  
-  /**
-   * Get the number of bytes acked
-   * @return the number of bytes acked
-   */
-  long getBytesAcked();
-  
-  /**
-   * Set the number bytes that have acked
-   * @param bytesAcked number bytes acked
-   */
-  void setBytesAcked(long bytesAcked);
-  
-  /**
-   * Release any disk space reserved for this replica.
-   */
-  public void releaseAllBytesReserved();
-
-  /**
-   * store the checksum for the last chunk along with the data length
-   * @param dataLength number of bytes on disk
-   * @param lastChecksum - checksum bytes for the last chunk
-   */
-  public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum);
-  
-  /**
-   * gets the last chunk checksum and the length of the block corresponding
-   * to that checksum
-   */
-  public ChunkChecksum getLastChecksumAndDataLen();
-  
-  /**
-   * Create output streams for writing to this replica, 
-   * one for block file and one for CRC file
-   * 
-   * @param isCreate if it is for creation
-   * @param requestedChecksum the checksum the writer would prefer to use
-   * @return output streams for writing
-   * @throws IOException if any error occurs
-   */
-  public ReplicaOutputStreams createStreams(boolean isCreate,
-      DataChecksum requestedChecksum) throws IOException;
-
-  /**
-   * Create an output stream to write restart metadata in case of datanode
-   * shutting down for quick restart.
-   *
-   * @return output stream for writing.
-   * @throws IOException if any error occurs
-   */
-  public OutputStream createRestartMetaStream() throws IOException;
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to