HDFS-10636. Modify ReplicaInfo to remove the assumption that replica metadata and data are stored in java.io.File. (Virajith Jalaparti via lei)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/86c9862b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/86c9862b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/86c9862b Branch: refs/heads/HADOOP-12756 Commit: 86c9862bec0248d671e657aa56094a2919b8ac14 Parents: 1c0d18f Author: Lei Xu <l...@apache.org> Authored: Tue Sep 13 12:53:37 2016 -0700 Committer: Lei Xu <l...@apache.org> Committed: Tue Sep 13 12:54:14 2016 -0700 ---------------------------------------------------------------------- .../server/datanode/BlockPoolSliceStorage.java | 16 +- .../hdfs/server/datanode/BlockReceiver.java | 2 +- .../hdfs/server/datanode/BlockSender.java | 7 +- .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../server/datanode/DataNodeFaultInjector.java | 2 +- .../hdfs/server/datanode/DataStorage.java | 4 +- .../hdfs/server/datanode/DirectoryScanner.java | 10 +- .../hdfs/server/datanode/FinalizedReplica.java | 27 +- .../hdfs/server/datanode/LocalReplica.java | 479 ++++++++++ .../server/datanode/LocalReplicaInPipeline.java | 417 +++++++++ .../server/datanode/ReplicaBeingWritten.java | 16 +- .../hdfs/server/datanode/ReplicaBuilder.java | 252 +++++ .../hdfs/server/datanode/ReplicaHandler.java | 6 +- .../hdfs/server/datanode/ReplicaInPipeline.java | 324 ++----- .../datanode/ReplicaInPipelineInterface.java | 86 -- .../hdfs/server/datanode/ReplicaInfo.java | 370 ++++---- .../server/datanode/ReplicaUnderRecovery.java | 30 +- .../datanode/ReplicaWaitingToBeRecovered.java | 27 +- .../server/datanode/fsdataset/FsDatasetSpi.java | 9 +- .../datanode/fsdataset/impl/BlockPoolSlice.java | 74 +- .../impl/FsDatasetAsyncDiskService.java | 71 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 908 +++++++------------ .../datanode/fsdataset/impl/FsDatasetUtil.java | 18 + .../datanode/fsdataset/impl/FsVolumeImpl.java | 154 +++- .../datanode/fsdataset/impl/FsVolumeList.java | 2 +- .../impl/RamDiskAsyncLazyPersistService.java | 34 +- .../TestClientProtocolForPipelineRecovery.java | 4 +- .../apache/hadoop/hdfs/TestCrcCorruption.java | 6 +- .../server/datanode/SimulatedFSDataset.java | 30 +- .../datanode/TestBlockPoolSliceStorage.java | 6 +- .../hdfs/server/datanode/TestBlockRecovery.java | 2 +- .../datanode/TestDataNodeRollingUpgrade.java | 6 +- .../server/datanode/TestDirectoryScanner.java | 17 +- .../server/datanode/TestSimulatedFSDataset.java | 2 +- .../hdfs/server/datanode/TestTransferRbw.java | 4 +- .../extdataset/ExternalDatasetImpl.java | 6 +- .../extdataset/ExternalReplicaInPipeline.java | 26 +- .../extdataset/TestExternalDataset.java | 4 +- .../fsdataset/impl/FsDatasetImplTestUtils.java | 43 +- .../fsdataset/impl/FsDatasetTestUtil.java | 20 +- .../fsdataset/impl/TestWriteToReplica.java | 4 +- 41 files changed, 2219 insertions(+), 1308 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java index fd90ae9..fd89611 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -741,7 +742,20 @@ public class BlockPoolSliceStorage extends Storage { * * @return the trash directory for a given block file that is being deleted. */ - public String getTrashDirectory(File blockFile) { + public String getTrashDirectory(ReplicaInfo info) { + + URI blockURI = info.getBlockURI(); + try{ + File blockFile = new File(blockURI); + return getTrashDirectory(blockFile); + } catch (IllegalArgumentException e) { + LOG.warn("Failed to get block file for replica " + info, e); + } + + return null; + } + + private String getTrashDirectory(File blockFile) { if (isTrashAllowed(blockFile)) { Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent()); String trashDirectory = matcher.replaceFirst("$1$2" + TRASH_ROOT_DIR + "$4"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 522d577..39419c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -121,7 +121,7 @@ class BlockReceiver implements Closeable { /** the block to receive */ private final ExtendedBlock block; /** the replica to write */ - private ReplicaInPipelineInterface replicaInfo; + private ReplicaInPipeline replicaInfo; /** pipeline stage */ private final BlockConstructionStage stage; private final boolean isTransfer; http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 9d9502b..c3ba2eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.util.DataTransferThrottler; @@ -248,8 +249,8 @@ class BlockSender implements java.io.Closeable { } // if there is a write in progress ChunkChecksum chunkChecksum = null; - if (replica instanceof ReplicaBeingWritten) { - final ReplicaBeingWritten rbw = (ReplicaBeingWritten)replica; + if (replica.getState() == ReplicaState.RBW) { + final ReplicaInPipeline rbw = (ReplicaInPipeline) replica; waitForMinLength(rbw, startOffset + length); chunkChecksum = rbw.getLastChecksumAndDataLen(); } @@ -473,7 +474,7 @@ class BlockSender implements java.io.Closeable { * @param len minimum length to reach * @throws IOException on failing to reach the len in given wait time */ - private static void waitForMinLength(ReplicaBeingWritten rbw, long len) + private static void waitForMinLength(ReplicaInPipeline rbw, long len) throws IOException { // Wait for 3 seconds for rbw replica to reach the minimum length for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; i++) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 0025040..09ecac1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3474,4 +3474,4 @@ public class DataNode extends ReconfigurableBase void setBlockScanner(BlockScanner blockScanner) { this.blockScanner = blockScanner; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 931c124..aa06aa1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -56,6 +56,6 @@ public class DataNodeFaultInjector { public void failMirrorConnection() throws IOException { } - public void failPipeline(ReplicaInPipelineInterface replicaInfo, + public void failPipeline(ReplicaInPipeline replicaInfo, String mirrorAddr) throws IOException { } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index 0e6b339..7e620c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -204,9 +204,9 @@ public class DataStorage extends Storage { * @return trash directory if rolling upgrade is in progress, null * otherwise. */ - public String getTrashDirectoryForBlockFile(String bpid, File blockFile) { + public String getTrashDirectoryForReplica(String bpid, ReplicaInfo info) { if (trashEnabledBpids.contains(bpid)) { - return getBPStorage(bpid).getTrashDirectory(blockFile); + return getBPStorage(bpid).getTrashDirectory(info); } return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index f9ebab9..c50bfaf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -597,14 +597,14 @@ public class DirectoryScanner implements Runnable { diffs.put(bpid, diffRecord); statsRecord.totalBlocks = blockpoolReport.length; - List<FinalizedReplica> bl = dataset.getFinalizedBlocks(bpid); - FinalizedReplica[] memReport = bl.toArray(new FinalizedReplica[bl.size()]); + List<ReplicaInfo> bl = dataset.getFinalizedBlocks(bpid); + ReplicaInfo[] memReport = bl.toArray(new ReplicaInfo[bl.size()]); Arrays.sort(memReport); // Sort based on blockId int d = 0; // index for blockpoolReport int m = 0; // index for memReprot while (m < memReport.length && d < blockpoolReport.length) { - FinalizedReplica memBlock = memReport[m]; + ReplicaInfo memBlock = memReport[m]; ScanInfo info = blockpoolReport[d]; if (info.getBlockId() < memBlock.getBlockId()) { if (!dataset.isDeletingBlock(bpid, info.getBlockId())) { @@ -633,7 +633,7 @@ public class DirectoryScanner implements Runnable { // or block file length is different than expected statsRecord.mismatchBlocks++; addDifference(diffRecord, statsRecord, info); - } else if (info.getBlockFile().compareTo(memBlock.getBlockFile()) != 0) { + } else if (memBlock.compareWith(info) != 0) { // volumeMap record and on-disk files don't match. statsRecord.duplicateBlocks++; addDifference(diffRecord, statsRecord, info); @@ -652,7 +652,7 @@ public class DirectoryScanner implements Runnable { } } while (m < memReport.length) { - FinalizedReplica current = memReport[m++]; + ReplicaInfo current = memReport[m++]; addDifference(diffRecord, statsRecord, current.getBlockId(), current.getVolume()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java index 8daeb51..81a4ab4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java @@ -22,11 +22,12 @@ import java.io.File; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; /** * This class describes a replica that has been finalized. */ -public class FinalizedReplica extends ReplicaInfo { +public class FinalizedReplica extends LocalReplica { /** * Constructor @@ -88,4 +89,28 @@ public class FinalizedReplica extends ReplicaInfo { public String toString() { return super.toString(); } + + @Override + public ReplicaInfo getOriginalReplica() { + throw new UnsupportedOperationException("Replica of type " + getState() + + " does not support getOriginalReplica"); + } + + @Override + public long getRecoveryID() { + throw new UnsupportedOperationException("Replica of type " + getState() + + " does not support getRecoveryID"); + } + + @Override + public void setRecoveryID(long recoveryId) { + throw new UnsupportedOperationException("Replica of type " + getState() + + " does not support setRecoveryID"); + } + + @Override + public ReplicaRecoveryInfo createInfo() { + throw new UnsupportedOperationException("Replica of type " + getState() + + " does not support createInfo"); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java new file mode 100644 index 0000000..cbfc9a5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java @@ -0,0 +1,479 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.HardLink; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfo; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.util.DataChecksum; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This class is used for all replicas which are on local storage media + * and hence, are backed by files. + */ +abstract public class LocalReplica extends ReplicaInfo { + + /** + * Base directory containing numerically-identified sub directories and + * possibly blocks. + */ + private File baseDir; + + /** + * Whether or not this replica's parent directory includes subdirs, in which + * case we can generate them based on the replica's block ID + */ + private boolean hasSubdirs; + + private static final Map<String, File> internedBaseDirs = new HashMap<String, File>(); + + static final Log LOG = LogFactory.getLog(LocalReplica.class); + private final static boolean IS_NATIVE_IO_AVAIL; + static { + IS_NATIVE_IO_AVAIL = NativeIO.isAvailable(); + if (Path.WINDOWS && !IS_NATIVE_IO_AVAIL) { + LOG.warn("Data node cannot fully support concurrent reading" + + " and writing without native code extensions on Windows."); + } + } + + /** + * Constructor + * @param block a block + * @param vol volume where replica is located + * @param dir directory path where block and meta files are located + */ + LocalReplica(Block block, FsVolumeSpi vol, File dir) { + this(block.getBlockId(), block.getNumBytes(), + block.getGenerationStamp(), vol, dir); + } + + /** + * Constructor + * @param blockId block id + * @param len replica length + * @param genStamp replica generation stamp + * @param vol volume where replica is located + * @param dir directory path where block and meta files are located + */ + LocalReplica(long blockId, long len, long genStamp, + FsVolumeSpi vol, File dir) { + super(vol, blockId, len, genStamp); + setDirInternal(dir); + } + + /** + * Copy constructor. + * @param from the source replica + */ + LocalReplica(LocalReplica from) { + this(from, from.getVolume(), from.getDir()); + } + + /** + * Get the full path of this replica's data file. + * @return the full path of this replica's data file + */ + @VisibleForTesting + public File getBlockFile() { + return new File(getDir(), getBlockName()); + } + + /** + * Get the full path of this replica's meta file. + * @return the full path of this replica's meta file + */ + @VisibleForTesting + public File getMetaFile() { + return new File(getDir(), + DatanodeUtil.getMetaName(getBlockName(), getGenerationStamp())); + } + + /** + * Return the parent directory path where this replica is located. + * @return the parent directory path where this replica is located + */ + protected File getDir() { + return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir, + getBlockId()) : baseDir; + } + + /** + * Set the parent directory where this replica is located. + * @param dir the parent directory where the replica is located + */ + private void setDirInternal(File dir) { + if (dir == null) { + baseDir = null; + return; + } + + ReplicaDirInfo dirInfo = parseBaseDir(dir); + this.hasSubdirs = dirInfo.hasSubidrs; + + synchronized (internedBaseDirs) { + if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) { + // Create a new String path of this file and make a brand new File object + // to guarantee we drop the reference to the underlying char[] storage. + File baseDir = new File(dirInfo.baseDirPath); + internedBaseDirs.put(dirInfo.baseDirPath, baseDir); + } + this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath); + } + } + + @VisibleForTesting + public static class ReplicaDirInfo { + public String baseDirPath; + public boolean hasSubidrs; + + public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) { + this.baseDirPath = baseDirPath; + this.hasSubidrs = hasSubidrs; + } + } + + @VisibleForTesting + public static ReplicaDirInfo parseBaseDir(File dir) { + + File currentDir = dir; + boolean hasSubdirs = false; + while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) { + hasSubdirs = true; + currentDir = currentDir.getParentFile(); + } + + return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs); + } + + /** + * Copy specified file into a temporary file. Then rename the + * temporary file to the original name. This will cause any + * hardlinks to the original file to be removed. The temporary + * files are created in the same directory. The temporary files will + * be recovered (especially on Windows) on datanode restart. + */ + private void breakHardlinks(File file, Block b) throws IOException { + File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file)); + try (FileInputStream in = new FileInputStream(file)) { + try (FileOutputStream out = new FileOutputStream(tmpFile)){ + IOUtils.copyBytes(in, out, 16 * 1024); + } + if (file.length() != tmpFile.length()) { + throw new IOException("Copy of file " + file + " size " + file.length()+ + " into file " + tmpFile + + " resulted in a size of " + tmpFile.length()); + } + FileUtil.replaceFile(tmpFile, file); + } catch (IOException e) { + boolean done = tmpFile.delete(); + if (!done) { + DataNode.LOG.info("detachFile failed to delete temporary file " + + tmpFile); + } + throw e; + } + } + + /** + * This function "breaks hardlinks" to the current replica file. + * + * When doing a DataNode upgrade, we create a bunch of hardlinks to each block + * file. This cleverly ensures that both the old and the new storage + * directories can contain the same block file, without using additional space + * for the data. + * + * However, when we want to append to the replica file, we need to "break" the + * hardlink to ensure that the old snapshot continues to contain the old data + * length. If we failed to do that, we could roll back to the previous/ + * directory during a downgrade, and find that the block contents were longer + * than they were at the time of upgrade. + * + * @return true only if data was copied. + * @throws IOException + */ + public boolean breakHardLinksIfNeeded() throws IOException { + File file = getBlockFile(); + if (file == null || getVolume() == null) { + throw new IOException("detachBlock:Block not found. " + this); + } + File meta = getMetaFile(); + + int linkCount = HardLink.getLinkCount(file); + if (linkCount > 1) { + DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " + + "block " + this); + breakHardlinks(file, this); + } + if (HardLink.getLinkCount(meta) > 1) { + breakHardlinks(meta, this); + } + return true; + } + + @Override + public URI getBlockURI() { + return getBlockFile().toURI(); + } + + @Override + public InputStream getDataInputStream(long seekOffset) throws IOException { + + File blockFile = getBlockFile(); + if (IS_NATIVE_IO_AVAIL) { + return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset); + } else { + try { + return FsDatasetUtil.openAndSeek(blockFile, seekOffset); + } catch (FileNotFoundException fnfe) { + throw new IOException("Block " + this + " is not valid. " + + "Expected block file at " + blockFile + " does not exist."); + } + } + } + + @Override + public OutputStream getDataOutputStream(boolean append) throws IOException { + return new FileOutputStream(getBlockFile(), append); + } + + @Override + public boolean blockDataExists() { + return getBlockFile().exists(); + } + + @Override + public boolean deleteBlockData() { + return getBlockFile().delete(); + } + + @Override + public long getBlockDataLength() { + return getBlockFile().length(); + } + + @Override + public URI getMetadataURI() { + return getMetaFile().toURI(); + } + + @Override + public LengthInputStream getMetadataInputStream(long offset) + throws IOException { + File meta = getMetaFile(); + return new LengthInputStream( + FsDatasetUtil.openAndSeek(meta, offset), meta.length()); + } + + @Override + public OutputStream getMetadataOutputStream(boolean append) + throws IOException { + return new FileOutputStream(getMetaFile(), append); + } + + @Override + public boolean metadataExists() { + return getMetaFile().exists(); + } + + @Override + public boolean deleteMetadata() { + return getMetaFile().delete(); + } + + @Override + public long getMetadataLength() { + return getMetaFile().length(); + } + + @Override + public boolean renameMeta(URI destURI) throws IOException { + return renameFile(getMetaFile(), new File(destURI)); + } + + @Override + public boolean renameData(URI destURI) throws IOException { + return renameFile(getBlockFile(), new File(destURI)); + } + + private boolean renameFile(File srcfile, File destfile) throws IOException { + try { + NativeIO.renameTo(srcfile, destfile); + return true; + } catch (IOException e) { + throw new IOException("Failed to move block file for " + this + + " from " + srcfile + " to " + destfile.getAbsolutePath(), e); + } + } + + @Override + public void updateWithReplica(StorageLocation replicaLocation) { + // for local replicas, the replica location is assumed to be a file. + File diskFile = replicaLocation.getFile(); + if (null == diskFile) { + setDirInternal(null); + } else { + setDirInternal(diskFile.getParentFile()); + } + } + + @Override + public boolean getPinning(LocalFileSystem localFS) throws IOException { + FileStatus fss = + localFS.getFileStatus(new Path(getBlockFile().getAbsolutePath())); + return fss.getPermission().getStickyBit(); + } + + @Override + public void setPinning(LocalFileSystem localFS) throws IOException { + File f = getBlockFile(); + Path p = new Path(f.getAbsolutePath()); + + FsPermission oldPermission = localFS.getFileStatus( + new Path(f.getAbsolutePath())).getPermission(); + //sticky bit is used for pinning purpose + FsPermission permission = new FsPermission(oldPermission.getUserAction(), + oldPermission.getGroupAction(), oldPermission.getOtherAction(), true); + localFS.setPermission(p, permission); + } + + @Override + public void bumpReplicaGS(long newGS) throws IOException { + long oldGS = getGenerationStamp(); + File oldmeta = getMetaFile(); + setGenerationStamp(newGS); + File newmeta = getMetaFile(); + + // rename meta file to new GS + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming " + oldmeta + " to " + newmeta); + } + try { + // calling renameMeta on the ReplicaInfo doesn't work here + NativeIO.renameTo(oldmeta, newmeta); + } catch (IOException e) { + setGenerationStamp(oldGS); // restore old GS + throw new IOException("Block " + this + " reopen failed. " + + " Unable to move meta file " + oldmeta + + " to " + newmeta, e); + } + } + + @Override + public void truncateBlock(long newLength) throws IOException { + truncateBlock(getBlockFile(), getMetaFile(), getNumBytes(), newLength); + } + + @Override + public int compareWith(ScanInfo info) { + return info.getBlockFile().compareTo(getBlockFile()); + } + + static public void truncateBlock(File blockFile, File metaFile, + long oldlen, long newlen) throws IOException { + LOG.info("truncateBlock: blockFile=" + blockFile + + ", metaFile=" + metaFile + + ", oldlen=" + oldlen + + ", newlen=" + newlen); + + if (newlen == oldlen) { + return; + } + if (newlen > oldlen) { + throw new IOException("Cannot truncate block to from oldlen (=" + oldlen + + ") to newlen (=" + newlen + ")"); + } + + DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); + int checksumsize = dcs.getChecksumSize(); + int bpc = dcs.getBytesPerChecksum(); + long n = (newlen - 1)/bpc + 1; + long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize; + long lastchunkoffset = (n - 1)*bpc; + int lastchunksize = (int)(newlen - lastchunkoffset); + byte[] b = new byte[Math.max(lastchunksize, checksumsize)]; + + RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw"); + try { + //truncate blockFile + blockRAF.setLength(newlen); + + //read last chunk + blockRAF.seek(lastchunkoffset); + blockRAF.readFully(b, 0, lastchunksize); + } finally { + blockRAF.close(); + } + + //compute checksum + dcs.update(b, 0, lastchunksize); + dcs.writeValue(b, 0, false); + + //update metaFile + RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); + try { + metaRAF.setLength(newmetalen); + metaRAF.seek(newmetalen - checksumsize); + metaRAF.write(b, 0, checksumsize); + } finally { + metaRAF.close(); + } + } + + @Override + public void copyMetadata(URI destination) throws IOException { + //for local replicas, we assume the destination URI is file + Storage.nativeCopyFileUnbuffered(getMetaFile(), + new File(destination), true); + } + + @Override + public void copyBlockdata(URI destination) throws IOException { + //for local replicas, we assume the destination URI is file + Storage.nativeCopyFileUnbuffered(getBlockFile(), + new File(destination), true); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java new file mode 100644 index 0000000..bc7bc6d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java @@ -0,0 +1,417 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.StringUtils; + +/** + * This class defines a replica in a pipeline, which + * includes a persistent replica being written to by a dfs client or + * a temporary replica being replicated by a source datanode or + * being copied for the balancing purpose. + * + * The base class implements a temporary replica + */ +public class LocalReplicaInPipeline extends LocalReplica + implements ReplicaInPipeline { + private long bytesAcked; + private long bytesOnDisk; + private byte[] lastChecksum; + private AtomicReference<Thread> writer = new AtomicReference<Thread>(); + + /** + * Bytes reserved for this replica on the containing volume. + * Based off difference between the estimated maximum block length and + * the bytes already written to this block. + */ + private long bytesReserved; + private final long originalBytesReserved; + + /** + * Constructor for a zero length replica. + * @param blockId block id + * @param genStamp replica generation stamp + * @param vol volume where replica is located + * @param dir directory path where block and meta files are located + * @param bytesToReserve disk space to reserve for this replica, based on + * the estimated maximum block length. + */ + public LocalReplicaInPipeline(long blockId, long genStamp, + FsVolumeSpi vol, File dir, long bytesToReserve) { + this(blockId, 0L, genStamp, vol, dir, Thread.currentThread(), + bytesToReserve); + } + + /** + * Constructor + * @param block a block + * @param vol volume where replica is located + * @param dir directory path where block and meta files are located + * @param writer a thread that is writing to this replica + */ + LocalReplicaInPipeline(Block block, + FsVolumeSpi vol, File dir, Thread writer) { + this(block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(), + vol, dir, writer, 0L); + } + + /** + * Constructor + * @param blockId block id + * @param len replica length + * @param genStamp replica generation stamp + * @param vol volume where replica is located + * @param dir directory path where block and meta files are located + * @param writer a thread that is writing to this replica + * @param bytesToReserve disk space to reserve for this replica, based on + * the estimated maximum block length. + */ + LocalReplicaInPipeline(long blockId, long len, long genStamp, + FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) { + super(blockId, len, genStamp, vol, dir); + this.bytesAcked = len; + this.bytesOnDisk = len; + this.writer.set(writer); + this.bytesReserved = bytesToReserve; + this.originalBytesReserved = bytesToReserve; + } + + /** + * Copy constructor. + * @param from where to copy from + */ + public LocalReplicaInPipeline(LocalReplicaInPipeline from) { + super(from); + this.bytesAcked = from.getBytesAcked(); + this.bytesOnDisk = from.getBytesOnDisk(); + this.writer.set(from.writer.get()); + this.bytesReserved = from.bytesReserved; + this.originalBytesReserved = from.originalBytesReserved; + } + + @Override + public long getVisibleLength() { + return -1; + } + + @Override //ReplicaInfo + public ReplicaState getState() { + return ReplicaState.TEMPORARY; + } + + @Override // ReplicaInPipeline + public long getBytesAcked() { + return bytesAcked; + } + + @Override // ReplicaInPipeline + public void setBytesAcked(long bytesAcked) { + long newBytesAcked = bytesAcked - this.bytesAcked; + this.bytesAcked = bytesAcked; + + // Once bytes are ACK'ed we can release equivalent space from the + // volume's reservedForRbw count. We could have released it as soon + // as the write-to-disk completed but that would be inefficient. + getVolume().releaseReservedSpace(newBytesAcked); + bytesReserved -= newBytesAcked; + } + + @Override // ReplicaInPipeline + public long getBytesOnDisk() { + return bytesOnDisk; + } + + @Override + public long getBytesReserved() { + return bytesReserved; + } + + @Override + public long getOriginalBytesReserved() { + return originalBytesReserved; + } + + @Override // ReplicaInPipeline + public void releaseAllBytesReserved() { + getVolume().releaseReservedSpace(bytesReserved); + getVolume().releaseLockedMemory(bytesReserved); + bytesReserved = 0; + } + + @Override // ReplicaInPipeline + public synchronized void setLastChecksumAndDataLen(long dataLength, + byte[] checksum) { + this.bytesOnDisk = dataLength; + this.lastChecksum = checksum; + } + + @Override // ReplicaInPipeline + public synchronized ChunkChecksum getLastChecksumAndDataLen() { + return new ChunkChecksum(getBytesOnDisk(), lastChecksum); + } + + @Override // ReplicaInPipeline + public void setWriter(Thread writer) { + this.writer.set(writer); + } + + @Override + public void interruptThread() { + Thread thread = writer.get(); + if (thread != null && thread != Thread.currentThread() + && thread.isAlive()) { + thread.interrupt(); + } + } + + @Override // Object + public boolean equals(Object o) { + return super.equals(o); + } + + /** + * Attempt to set the writer to a new value. + */ + @Override // ReplicaInPipeline + public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) { + return writer.compareAndSet(prevWriter, newWriter); + } + + /** + * Interrupt the writing thread and wait until it dies. + * @throws IOException the waiting is interrupted + */ + @Override // ReplicaInPipeline + public void stopWriter(long xceiverStopTimeout) throws IOException { + while (true) { + Thread thread = writer.get(); + if ((thread == null) || (thread == Thread.currentThread()) || + (!thread.isAlive())) { + if (writer.compareAndSet(thread, null)) { + return; // Done + } + // The writer changed. Go back to the start of the loop and attempt to + // stop the new writer. + continue; + } + thread.interrupt(); + try { + thread.join(xceiverStopTimeout); + if (thread.isAlive()) { + // Our thread join timed out. + final String msg = "Join on writer thread " + thread + " timed out"; + DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(thread)); + throw new IOException(msg); + } + } catch (InterruptedException e) { + throw new IOException("Waiting for writer thread is interrupted."); + } + } + } + + @Override // Object + public int hashCode() { + return super.hashCode(); + } + + @Override // ReplicaInPipeline + public ReplicaOutputStreams createStreams(boolean isCreate, + DataChecksum requestedChecksum) throws IOException { + File blockFile = getBlockFile(); + File metaFile = getMetaFile(); + if (DataNode.LOG.isDebugEnabled()) { + DataNode.LOG.debug("writeTo blockfile is " + blockFile + + " of size " + blockFile.length()); + DataNode.LOG.debug("writeTo metafile is " + metaFile + + " of size " + metaFile.length()); + } + long blockDiskSize = 0L; + long crcDiskSize = 0L; + + // the checksum that should actually be used -- this + // may differ from requestedChecksum for appends. + final DataChecksum checksum; + + RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); + + if (!isCreate) { + // For append or recovery, we must enforce the existing checksum. + // Also, verify that the file has correct lengths, etc. + boolean checkedMeta = false; + try { + BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF); + checksum = header.getChecksum(); + + if (checksum.getBytesPerChecksum() != + requestedChecksum.getBytesPerChecksum()) { + throw new IOException("Client requested checksum " + + requestedChecksum + " when appending to an existing block " + + "with different chunk size: " + checksum); + } + + int bytesPerChunk = checksum.getBytesPerChecksum(); + int checksumSize = checksum.getChecksumSize(); + + blockDiskSize = bytesOnDisk; + crcDiskSize = BlockMetadataHeader.getHeaderSize() + + (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize; + if (blockDiskSize > 0 && + (blockDiskSize > blockFile.length() || + crcDiskSize>metaFile.length())) { + throw new IOException("Corrupted block: " + this); + } + checkedMeta = true; + } finally { + if (!checkedMeta) { + // clean up in case of exceptions. + IOUtils.closeStream(metaRAF); + } + } + } else { + // for create, we can use the requested checksum + checksum = requestedChecksum; + } + + FileOutputStream blockOut = null; + FileOutputStream crcOut = null; + try { + blockOut = new FileOutputStream( + new RandomAccessFile(blockFile, "rw").getFD()); + crcOut = new FileOutputStream(metaRAF.getFD()); + if (!isCreate) { + blockOut.getChannel().position(blockDiskSize); + crcOut.getChannel().position(crcDiskSize); + } + return new ReplicaOutputStreams(blockOut, crcOut, checksum, + getVolume().isTransientStorage()); + } catch (IOException e) { + IOUtils.closeStream(blockOut); + IOUtils.closeStream(metaRAF); + throw e; + } + } + + @Override + public OutputStream createRestartMetaStream() throws IOException { + File blockFile = getBlockFile(); + File restartMeta = new File(blockFile.getParent() + + File.pathSeparator + "." + blockFile.getName() + ".restart"); + if (restartMeta.exists() && !restartMeta.delete()) { + DataNode.LOG.warn("Failed to delete restart meta file: " + + restartMeta.getPath()); + } + return new FileOutputStream(restartMeta); + } + + @Override + public String toString() { + return super.toString() + + "\n bytesAcked=" + bytesAcked + + "\n bytesOnDisk=" + bytesOnDisk; + } + + @Override + public ReplicaInfo getOriginalReplica() { + throw new UnsupportedOperationException("Replica of type " + getState() + + " does not support getOriginalReplica"); + } + + @Override + public long getRecoveryID() { + throw new UnsupportedOperationException("Replica of type " + getState() + + " does not support getRecoveryID"); + } + + @Override + public void setRecoveryID(long recoveryId) { + throw new UnsupportedOperationException("Replica of type " + getState() + + " does not support setRecoveryID"); + } + + @Override + public ReplicaRecoveryInfo createInfo(){ + throw new UnsupportedOperationException("Replica of type " + getState() + + " does not support createInfo"); + } + + public void moveReplicaFrom(ReplicaInfo oldReplicaInfo, File newBlkFile) + throws IOException { + + if (!(oldReplicaInfo instanceof LocalReplica)) { + throw new IOException("The source replica with blk id " + + oldReplicaInfo.getBlockId() + + " should be derived from LocalReplica"); + } + + LocalReplica localReplica = (LocalReplica) oldReplicaInfo; + + File oldmeta = localReplica.getMetaFile(); + File newmeta = getMetaFile(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming " + oldmeta + " to " + newmeta); + } + try { + NativeIO.renameTo(oldmeta, newmeta); + } catch (IOException e) { + throw new IOException("Block " + oldReplicaInfo + " reopen failed. " + + " Unable to move meta file " + oldmeta + + " to rbw dir " + newmeta, e); + } + + File blkfile = localReplica.getBlockFile(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming " + blkfile + " to " + newBlkFile + + ", file length=" + blkfile.length()); + } + try { + NativeIO.renameTo(blkfile, newBlkFile); + } catch (IOException e) { + try { + NativeIO.renameTo(newmeta, oldmeta); + } catch (IOException ex) { + LOG.warn("Cannot move meta file " + newmeta + + "back to the finalized directory " + oldmeta, ex); + } + throw new IOException("Block " + oldReplicaInfo + " reopen failed. " + + " Unable to move block file " + blkfile + + " to rbw dir " + newBlkFile, e); + } + } + + @Override // ReplicaInPipeline + public ReplicaInfo getReplicaInfo() { + return this; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java index 4a89493..262533e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java @@ -27,9 +27,9 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; * Those are the replicas that * are created in a pipeline initiated by a dfs client. */ -public class ReplicaBeingWritten extends ReplicaInPipeline { +public class ReplicaBeingWritten extends LocalReplicaInPipeline { /** - * Constructor for a zero length replica + * Constructor for a zero length replica. * @param blockId block id * @param genStamp replica generation stamp * @param vol volume where replica is located @@ -37,25 +37,25 @@ public class ReplicaBeingWritten extends ReplicaInPipeline { * @param bytesToReserve disk space to reserve for this replica, based on * the estimated maximum block length. */ - public ReplicaBeingWritten(long blockId, long genStamp, + public ReplicaBeingWritten(long blockId, long genStamp, FsVolumeSpi vol, File dir, long bytesToReserve) { super(blockId, genStamp, vol, dir, bytesToReserve); } - + /** - * Constructor + * Constructor. * @param block a block * @param vol volume where replica is located * @param dir directory path where block and meta files are located * @param writer a thread that is writing to this replica */ - public ReplicaBeingWritten(Block block, + public ReplicaBeingWritten(Block block, FsVolumeSpi vol, File dir, Thread writer) { - super( block, vol, dir, writer); + super(block, vol, dir, writer); } /** - * Constructor + * Constructor. * @param blockId block id * @param len replica length * @param genStamp replica generation stamp http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java new file mode 100644 index 0000000..280aaa0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.File; + +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; + +/** + * This class is to be used as a builder for {@link ReplicaInfo} objects. + * The state of the replica is used to determine which object is instantiated. + */ +public class ReplicaBuilder { + + private ReplicaState state; + private long blockId; + private long genStamp; + private long length; + private FsVolumeSpi volume; + private File directoryUsed; + private long bytesToReserve; + private Thread writer; + private long recoveryId; + private Block block; + + private ReplicaInfo fromReplica; + + public ReplicaBuilder(ReplicaState state) { + volume = null; + writer = null; + block = null; + length = -1; + this.state = state; + } + + public ReplicaBuilder setState(ReplicaState state) { + this.state = state; + return this; + } + + public ReplicaBuilder setBlockId(long blockId) { + this.blockId = blockId; + return this; + } + + public ReplicaBuilder setGenerationStamp(long genStamp) { + this.genStamp = genStamp; + return this; + } + + public ReplicaBuilder setLength(long length) { + this.length = length; + return this; + } + + public ReplicaBuilder setFsVolume(FsVolumeSpi volume) { + this.volume = volume; + return this; + } + + public ReplicaBuilder setDirectoryToUse(File dir) { + this.directoryUsed = dir; + return this; + } + + public ReplicaBuilder setBytesToReserve(long bytesToReserve) { + this.bytesToReserve = bytesToReserve; + return this; + } + + public ReplicaBuilder setWriterThread(Thread writer) { + this.writer = writer; + return this; + } + + public ReplicaBuilder from(ReplicaInfo fromReplica) { + this.fromReplica = fromReplica; + return this; + } + + public ReplicaBuilder setRecoveryId(long recoveryId) { + this.recoveryId = recoveryId; + return this; + } + + public ReplicaBuilder setBlock(Block block) { + this.block = block; + return this; + } + + public LocalReplicaInPipeline buildLocalReplicaInPipeline() + throws IllegalArgumentException { + LocalReplicaInPipeline info = null; + switch(state) { + case RBW: + info = buildRBW(); + break; + case TEMPORARY: + info = buildTemporaryReplica(); + break; + default: + throw new IllegalArgumentException("Unknown replica state " + state); + } + return info; + } + + private LocalReplicaInPipeline buildRBW() throws IllegalArgumentException { + if (null != fromReplica && fromReplica.getState() == ReplicaState.RBW) { + return new ReplicaBeingWritten((ReplicaBeingWritten) fromReplica); + } else if (null != fromReplica) { + throw new IllegalArgumentException("Incompatible fromReplica " + + "state: " + fromReplica.getState()); + } else { + if (null != block) { + if (null == writer) { + throw new IllegalArgumentException("A valid writer is " + + "required for constructing a RBW from block " + + block.getBlockId()); + } + return new ReplicaBeingWritten(block, volume, directoryUsed, writer); + } else { + if (length != -1) { + return new ReplicaBeingWritten(blockId, length, genStamp, + volume, directoryUsed, writer, bytesToReserve); + } else { + return new ReplicaBeingWritten(blockId, genStamp, volume, + directoryUsed, bytesToReserve); + } + } + } + } + + private LocalReplicaInPipeline buildTemporaryReplica() + throws IllegalArgumentException { + if (null != fromReplica && + fromReplica.getState() == ReplicaState.TEMPORARY) { + return new LocalReplicaInPipeline((LocalReplicaInPipeline) fromReplica); + } else if (null != fromReplica) { + throw new IllegalArgumentException("Incompatible fromReplica " + + "state: " + fromReplica.getState()); + } else { + if (null != block) { + if (null == writer) { + throw new IllegalArgumentException("A valid writer is " + + "required for constructing a Replica from block " + + block.getBlockId()); + } + return new LocalReplicaInPipeline(block, volume, directoryUsed, + writer); + } else { + if (length != -1) { + return new LocalReplicaInPipeline(blockId, length, genStamp, + volume, directoryUsed, writer, bytesToReserve); + } else { + return new LocalReplicaInPipeline(blockId, genStamp, volume, + directoryUsed, bytesToReserve); + } + } + } + } + + private ReplicaInfo buildFinalizedReplica() throws IllegalArgumentException { + if (null != fromReplica && + fromReplica.getState() == ReplicaState.FINALIZED) { + return new FinalizedReplica((FinalizedReplica)fromReplica); + } else if (null != this.fromReplica) { + throw new IllegalArgumentException("Incompatible fromReplica " + + "state: " + fromReplica.getState()); + } else { + if (null != block) { + return new FinalizedReplica(block, volume, directoryUsed); + } else { + return new FinalizedReplica(blockId, length, genStamp, volume, + directoryUsed); + } + } + } + + private ReplicaInfo buildRWR() throws IllegalArgumentException { + + if (null != fromReplica && fromReplica.getState() == ReplicaState.RWR) { + return new ReplicaWaitingToBeRecovered( + (ReplicaWaitingToBeRecovered) fromReplica); + } else if (null != fromReplica){ + throw new IllegalArgumentException("Incompatible fromReplica " + + "state: " + fromReplica.getState()); + } else { + if (null != block) { + return new ReplicaWaitingToBeRecovered(block, volume, directoryUsed); + } else { + return new ReplicaWaitingToBeRecovered(blockId, length, genStamp, + volume, directoryUsed); + } + } + } + + private ReplicaInfo buildRUR() throws IllegalArgumentException { + if (null == fromReplica) { + throw new IllegalArgumentException( + "Missing a valid replica to recover from"); + } + if (null != writer || null != block) { + throw new IllegalArgumentException("Invalid state for " + + "recovering from replica with blk id " + + fromReplica.getBlockId()); + } + if (fromReplica.getState() == ReplicaState.RUR) { + return new ReplicaUnderRecovery((ReplicaUnderRecovery) fromReplica); + } else { + return new ReplicaUnderRecovery(fromReplica, recoveryId); + } + } + + public ReplicaInfo build() throws IllegalArgumentException { + ReplicaInfo info = null; + switch(this.state) { + case FINALIZED: + info = buildFinalizedReplica(); + break; + case RWR: + info = buildRWR(); + break; + case RUR: + info = buildRUR(); + break; + case RBW: + case TEMPORARY: + info = buildLocalReplicaInPipeline(); + break; + default: + throw new IllegalArgumentException("Unknown replica state " + state); + } + return info; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java index b563d7f..ddc9f9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java @@ -27,11 +27,11 @@ import java.io.IOException; * the fs volume where this replica is located. */ public class ReplicaHandler implements Closeable { - private final ReplicaInPipelineInterface replica; + private final ReplicaInPipeline replica; private final FsVolumeReference volumeReference; public ReplicaHandler( - ReplicaInPipelineInterface replica, FsVolumeReference reference) { + ReplicaInPipeline replica, FsVolumeReference reference) { this.replica = replica; this.volumeReference = reference; } @@ -43,7 +43,7 @@ public class ReplicaHandler implements Closeable { } } - public ReplicaInPipelineInterface getReplica() { + public ReplicaInPipeline getReplica() { return replica; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index 7326846..efa6ea6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -17,313 +17,91 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.io.RandomAccessFile; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; -import org.apache.hadoop.util.StringUtils; /** - * This class defines a replica in a pipeline, which - * includes a persistent replica being written to by a dfs client or - * a temporary replica being replicated by a source datanode or - * being copied for the balancing purpose. - * - * The base class implements a temporary replica + * This defines the interface of a replica in Pipeline that's being written to */ -public class ReplicaInPipeline extends ReplicaInfo - implements ReplicaInPipelineInterface { - private long bytesAcked; - private long bytesOnDisk; - private byte[] lastChecksum; - private AtomicReference<Thread> writer = new AtomicReference<Thread>(); - +public interface ReplicaInPipeline extends Replica { /** - * Bytes reserved for this replica on the containing volume. - * Based off difference between the estimated maximum block length and - * the bytes already written to this block. + * Set the number of bytes received + * @param bytesReceived number of bytes received */ - private long bytesReserved; - private final long originalBytesReserved; + void setNumBytes(long bytesReceived); /** - * Constructor for a zero length replica - * @param blockId block id - * @param genStamp replica generation stamp - * @param vol volume where replica is located - * @param dir directory path where block and meta files are located - * @param bytesToReserve disk space to reserve for this replica, based on - * the estimated maximum block length. + * Get the number of bytes acked + * @return the number of bytes acked */ - public ReplicaInPipeline(long blockId, long genStamp, - FsVolumeSpi vol, File dir, long bytesToReserve) { - this(blockId, 0L, genStamp, vol, dir, Thread.currentThread(), bytesToReserve); - } + long getBytesAcked(); /** - * Constructor - * @param block a block - * @param vol volume where replica is located - * @param dir directory path where block and meta files are located - * @param writer a thread that is writing to this replica + * Set the number bytes that have acked + * @param bytesAcked number bytes acked */ - ReplicaInPipeline(Block block, - FsVolumeSpi vol, File dir, Thread writer) { - this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(), - vol, dir, writer, 0L); - } + void setBytesAcked(long bytesAcked); /** - * Constructor - * @param blockId block id - * @param len replica length - * @param genStamp replica generation stamp - * @param vol volume where replica is located - * @param dir directory path where block and meta files are located - * @param writer a thread that is writing to this replica - * @param bytesToReserve disk space to reserve for this replica, based on - * the estimated maximum block length. + * Release any disk space reserved for this replica. */ - ReplicaInPipeline(long blockId, long len, long genStamp, - FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) { - super( blockId, len, genStamp, vol, dir); - this.bytesAcked = len; - this.bytesOnDisk = len; - this.writer.set(writer); - this.bytesReserved = bytesToReserve; - this.originalBytesReserved = bytesToReserve; - } + public void releaseAllBytesReserved(); /** - * Copy constructor. - * @param from where to copy from + * store the checksum for the last chunk along with the data length + * @param dataLength number of bytes on disk + * @param lastChecksum - checksum bytes for the last chunk */ - public ReplicaInPipeline(ReplicaInPipeline from) { - super(from); - this.bytesAcked = from.getBytesAcked(); - this.bytesOnDisk = from.getBytesOnDisk(); - this.writer.set(from.writer.get()); - this.bytesReserved = from.bytesReserved; - this.originalBytesReserved = from.originalBytesReserved; - } - - @Override - public long getVisibleLength() { - return -1; - } - - @Override //ReplicaInfo - public ReplicaState getState() { - return ReplicaState.TEMPORARY; - } + public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum); - @Override // ReplicaInPipelineInterface - public long getBytesAcked() { - return bytesAcked; - } + /** + * gets the last chunk checksum and the length of the block corresponding + * to that checksum + */ + public ChunkChecksum getLastChecksumAndDataLen(); - @Override // ReplicaInPipelineInterface - public void setBytesAcked(long bytesAcked) { - long newBytesAcked = bytesAcked - this.bytesAcked; - this.bytesAcked = bytesAcked; + /** + * Create output streams for writing to this replica, + * one for block file and one for CRC file + * + * @param isCreate if it is for creation + * @param requestedChecksum the checksum the writer would prefer to use + * @return output streams for writing + * @throws IOException if any error occurs + */ + public ReplicaOutputStreams createStreams(boolean isCreate, + DataChecksum requestedChecksum) throws IOException; - // Once bytes are ACK'ed we can release equivalent space from the - // volume's reservedForRbw count. We could have released it as soon - // as the write-to-disk completed but that would be inefficient. - getVolume().releaseReservedSpace(newBytesAcked); - bytesReserved -= newBytesAcked; - } + /** + * Create an output stream to write restart metadata in case of datanode + * shutting down for quick restart. + * + * @return output stream for writing. + * @throws IOException if any error occurs + */ + public OutputStream createRestartMetaStream() throws IOException; - @Override // ReplicaInPipelineInterface - public long getBytesOnDisk() { - return bytesOnDisk; - } - - @Override - public long getBytesReserved() { - return bytesReserved; - } + ReplicaInfo getReplicaInfo(); - @Override - public long getOriginalBytesReserved() { - return originalBytesReserved; - } - - @Override - public void releaseAllBytesReserved() { // ReplicaInPipelineInterface - getVolume().releaseReservedSpace(bytesReserved); - getVolume().releaseLockedMemory(bytesReserved); - bytesReserved = 0; - } - - @Override // ReplicaInPipelineInterface - public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) { - this.bytesOnDisk = dataLength; - this.lastChecksum = lastChecksum; - } + /** + * Set the thread that is writing to this replica + * @param writer a thread writing to this replica + */ + void setWriter(Thread writer); - @Override // ReplicaInPipelineInterface - public synchronized ChunkChecksum getLastChecksumAndDataLen() { - return new ChunkChecksum(getBytesOnDisk(), lastChecksum); - } - - public void interruptThread() { - Thread thread = writer.get(); - if (thread != null && thread != Thread.currentThread() - && thread.isAlive()) { - thread.interrupt(); - } - } - - @Override // Object - public boolean equals(Object o) { - return super.equals(o); - } + void interruptThread(); /** * Attempt to set the writer to a new value. */ - public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) { - return writer.compareAndSet(prevWriter, newWriter); - } + boolean attemptToSetWriter(Thread prevWriter, Thread newWriter); /** - * Interrupt the writing thread and wait until it dies + * Interrupt the writing thread and wait until it dies. * @throws IOException the waiting is interrupted */ - public void stopWriter(long xceiverStopTimeout) throws IOException { - while (true) { - Thread thread = writer.get(); - if ((thread == null) || (thread == Thread.currentThread()) || - (!thread.isAlive())) { - if (writer.compareAndSet(thread, null) == true) { - return; // Done - } - // The writer changed. Go back to the start of the loop and attempt to - // stop the new writer. - continue; - } - thread.interrupt(); - try { - thread.join(xceiverStopTimeout); - if (thread.isAlive()) { - // Our thread join timed out. - final String msg = "Join on writer thread " + thread + " timed out"; - DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(thread)); - throw new IOException(msg); - } - } catch (InterruptedException e) { - throw new IOException("Waiting for writer thread is interrupted."); - } - } - } - - @Override // Object - public int hashCode() { - return super.hashCode(); - } - - @Override // ReplicaInPipelineInterface - public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum) throws IOException { - File blockFile = getBlockFile(); - File metaFile = getMetaFile(); - if (DataNode.LOG.isDebugEnabled()) { - DataNode.LOG.debug("writeTo blockfile is " + blockFile + - " of size " + blockFile.length()); - DataNode.LOG.debug("writeTo metafile is " + metaFile + - " of size " + metaFile.length()); - } - long blockDiskSize = 0L; - long crcDiskSize = 0L; - - // the checksum that should actually be used -- this - // may differ from requestedChecksum for appends. - final DataChecksum checksum; - - RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); - - if (!isCreate) { - // For append or recovery, we must enforce the existing checksum. - // Also, verify that the file has correct lengths, etc. - boolean checkedMeta = false; - try { - BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF); - checksum = header.getChecksum(); - - if (checksum.getBytesPerChecksum() != - requestedChecksum.getBytesPerChecksum()) { - throw new IOException("Client requested checksum " + - requestedChecksum + " when appending to an existing block " + - "with different chunk size: " + checksum); - } - - int bytesPerChunk = checksum.getBytesPerChecksum(); - int checksumSize = checksum.getChecksumSize(); - - blockDiskSize = bytesOnDisk; - crcDiskSize = BlockMetadataHeader.getHeaderSize() + - (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize; - if (blockDiskSize>0 && - (blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) { - throw new IOException("Corrupted block: " + this); - } - checkedMeta = true; - } finally { - if (!checkedMeta) { - // clean up in case of exceptions. - IOUtils.closeStream(metaRAF); - } - } - } else { - // for create, we can use the requested checksum - checksum = requestedChecksum; - } - - FileOutputStream blockOut = null; - FileOutputStream crcOut = null; - try { - blockOut = new FileOutputStream( - new RandomAccessFile( blockFile, "rw" ).getFD() ); - crcOut = new FileOutputStream(metaRAF.getFD() ); - if (!isCreate) { - blockOut.getChannel().position(blockDiskSize); - crcOut.getChannel().position(crcDiskSize); - } - return new ReplicaOutputStreams(blockOut, crcOut, checksum, - getVolume().isTransientStorage()); - } catch (IOException e) { - IOUtils.closeStream(blockOut); - IOUtils.closeStream(metaRAF); - throw e; - } - } - - @Override - public OutputStream createRestartMetaStream() throws IOException { - File blockFile = getBlockFile(); - File restartMeta = new File(blockFile.getParent() + - File.pathSeparator + "." + blockFile.getName() + ".restart"); - if (restartMeta.exists() && !restartMeta.delete()) { - DataNode.LOG.warn("Failed to delete restart meta file: " + - restartMeta.getPath()); - } - return new FileOutputStream(restartMeta); - } - - @Override - public String toString() { - return super.toString() - + "\n bytesAcked=" + bytesAcked - + "\n bytesOnDisk=" + bytesOnDisk; - } + void stopWriter(long xceiverStopTimeout) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java deleted file mode 100644 index ef9f3e2..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode; - -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; -import org.apache.hadoop.util.DataChecksum; - -/** - * This defines the interface of a replica in Pipeline that's being written to - */ -public interface ReplicaInPipelineInterface extends Replica { - /** - * Set the number of bytes received - * @param bytesReceived number of bytes received - */ - void setNumBytes(long bytesReceived); - - /** - * Get the number of bytes acked - * @return the number of bytes acked - */ - long getBytesAcked(); - - /** - * Set the number bytes that have acked - * @param bytesAcked number bytes acked - */ - void setBytesAcked(long bytesAcked); - - /** - * Release any disk space reserved for this replica. - */ - public void releaseAllBytesReserved(); - - /** - * store the checksum for the last chunk along with the data length - * @param dataLength number of bytes on disk - * @param lastChecksum - checksum bytes for the last chunk - */ - public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum); - - /** - * gets the last chunk checksum and the length of the block corresponding - * to that checksum - */ - public ChunkChecksum getLastChecksumAndDataLen(); - - /** - * Create output streams for writing to this replica, - * one for block file and one for CRC file - * - * @param isCreate if it is for creation - * @param requestedChecksum the checksum the writer would prefer to use - * @return output streams for writing - * @throws IOException if any error occurs - */ - public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum) throws IOException; - - /** - * Create an output stream to write restart metadata in case of datanode - * shutting down for quick restart. - * - * @return output stream for writing. - * @throws IOException if any error occurs - */ - public OutputStream createRestartMetaStream() throws IOException; -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org