http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index dc63238..d3006c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -45,6 +45,10 @@ abstract public class ReplicaInfo extends Block /** volume where the replica belongs. */ private FsVolumeSpi volume; + /** This is used by some tests and FsDatasetUtil#computeChecksum. */ + private static final FileIoProvider DEFAULT_FILE_IO_PROVIDER = + new FileIoProvider(null); + /** * Constructor * @param vol volume where replica is located @@ -64,7 +68,18 @@ abstract public class ReplicaInfo extends Block public FsVolumeSpi getVolume() { return volume; } - + + /** + * Get the {@link FileIoProvider} for disk IO operations. + */ + public FileIoProvider getFileIoProvider() { + // In tests and when invoked via FsDatasetUtil#computeChecksum, the + // target volume for this replica may be unknown and hence null. + // Use the DEFAULT_FILE_IO_PROVIDER with no-op hooks. + return (volume != null) ? volume.getFileIoProvider() + : DEFAULT_FILE_IO_PROVIDER; + } + /** * Set the volume where this replica is located on disk. */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index a11a207..4947ecf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.checker.Checkable; @@ -418,4 +419,6 @@ public interface FsVolumeSpi */ class VolumeCheckContext { } + + FileIoProvider getFileIoProvider(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java index 54d0e96..f40315a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java @@ -24,8 +24,8 @@ import java.io.InputStream; import java.io.IOException; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIOException; import org.slf4j.Logger; @@ -38,12 +38,15 @@ public class ReplicaInputStreams implements Closeable { private InputStream dataIn; private InputStream checksumIn; private FsVolumeReference volumeRef; + private final FileIoProvider fileIoProvider; private FileDescriptor dataInFd = null; /** Create an object with a data input stream and a checksum input stream. */ - public ReplicaInputStreams(InputStream dataStream, - InputStream checksumStream, FsVolumeReference volumeRef) { + public ReplicaInputStreams( + InputStream dataStream, InputStream checksumStream, + FsVolumeReference volumeRef, FileIoProvider fileIoProvider) { this.volumeRef = volumeRef; + this.fileIoProvider = fileIoProvider; this.dataIn = dataStream; this.checksumIn = checksumStream; if (dataIn instanceof FileInputStream) { @@ -103,7 +106,7 @@ public class ReplicaInputStreams implements Closeable { public void dropCacheBehindReads(String identifier, long offset, long len, int flags) throws NativeIOException { assert this.dataInFd != null : "null dataInFd!"; - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( + fileIoProvider.posixFadvise(getVolumeRef().getVolume(), identifier, dataInFd, offset, len, flags); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java index a66847a..1614ba2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java @@ -24,11 +24,10 @@ import java.io.OutputStream; import java.io.IOException; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIOException; import org.apache.hadoop.util.DataChecksum; -import org.apache.hadoop.util.Time; import org.slf4j.Logger; /** @@ -43,21 +42,22 @@ public class ReplicaOutputStreams implements Closeable { /** Stream to checksum. */ private final OutputStream checksumOut; private final DataChecksum checksum; - private final boolean isTransientStorage; - private final long slowLogThresholdMs; + private final FsVolumeSpi volume; + private final FileIoProvider fileIoProvider; /** * Create an object with a data output stream, a checksum output stream * and a checksum. */ - public ReplicaOutputStreams(OutputStream dataOut, - OutputStream checksumOut, DataChecksum checksum, - boolean isTransientStorage, long slowLogThresholdMs) { + public ReplicaOutputStreams( + OutputStream dataOut, OutputStream checksumOut, DataChecksum checksum, + FsVolumeSpi volume, FileIoProvider fileIoProvider) { + this.dataOut = dataOut; this.checksum = checksum; - this.slowLogThresholdMs = slowLogThresholdMs; - this.isTransientStorage = isTransientStorage; this.checksumOut = checksumOut; + this.volume = volume; + this.fileIoProvider = fileIoProvider; try { if (this.dataOut instanceof FileOutputStream) { @@ -93,7 +93,7 @@ public class ReplicaOutputStreams implements Closeable { /** @return is writing to a transient storage? */ public boolean isTransientStorage() { - return isTransientStorage; + return volume.isTransientStorage(); } @Override @@ -112,7 +112,7 @@ public class ReplicaOutputStreams implements Closeable { */ public void syncDataOut() throws IOException { if (dataOut instanceof FileOutputStream) { - sync((FileOutputStream)dataOut); + fileIoProvider.sync(volume, (FileOutputStream) dataOut); } } @@ -121,7 +121,7 @@ public class ReplicaOutputStreams implements Closeable { */ public void syncChecksumOut() throws IOException { if (checksumOut instanceof FileOutputStream) { - sync((FileOutputStream)checksumOut); + fileIoProvider.sync(volume, (FileOutputStream) checksumOut); } } @@ -129,60 +129,34 @@ public class ReplicaOutputStreams implements Closeable { * Flush the data stream if it supports it. */ public void flushDataOut() throws IOException { - flush(dataOut); + if (dataOut != null) { + fileIoProvider.flush(volume, dataOut); + } } /** * Flush the checksum stream if it supports it. */ public void flushChecksumOut() throws IOException { - flush(checksumOut); - } - - private void flush(OutputStream dos) throws IOException { - long begin = Time.monotonicNow(); - dos.flush(); - long duration = Time.monotonicNow() - begin; - LOG.trace("ReplicaOutputStreams#flush takes {} ms.", duration); - if (duration > slowLogThresholdMs) { - LOG.warn("Slow flush took {} ms (threshold={} ms)", duration, - slowLogThresholdMs); + if (checksumOut != null) { + fileIoProvider.flush(volume, checksumOut); } } - private void sync(FileOutputStream fos) throws IOException { - long begin = Time.monotonicNow(); - fos.getChannel().force(true); - long duration = Time.monotonicNow() - begin; - LOG.trace("ReplicaOutputStreams#sync takes {} ms.", duration); - if (duration > slowLogThresholdMs) { - LOG.warn("Slow fsync took {} ms (threshold={} ms)", duration, - slowLogThresholdMs); - } - } - - public long writeToDisk(byte[] b, int off, int len) throws IOException { - long begin = Time.monotonicNow(); + public void writeDataToDisk(byte[] b, int off, int len) + throws IOException { dataOut.write(b, off, len); - long duration = Time.monotonicNow() - begin; - LOG.trace("DatanodeIO#writeToDisk takes {} ms.", duration); - if (duration > slowLogThresholdMs) { - LOG.warn("Slow BlockReceiver write data to disk cost: {} ms " + - "(threshold={} ms)", duration, slowLogThresholdMs); - } - return duration; } public void syncFileRangeIfPossible(long offset, long nbytes, int flags) throws NativeIOException { - assert this.outFd != null : "null outFd!"; - NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, nbytes, flags); + fileIoProvider.syncFileRange( + volume, outFd, offset, nbytes, flags); } public void dropCacheBehindWrites(String identifier, long offset, long len, int flags) throws NativeIOException { - assert this.outFd != null : "null outFd!"; - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( - identifier, outFd, offset, len, flags); + fileIoProvider.posixFadvise( + volume, identifier, outFd, offset, len, flags); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 63e82f3..8273ebb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -32,13 +32,11 @@ import java.util.Iterator; import java.util.Scanner; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CachingGetSpaceUsed; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.GetSpaceUsed; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; @@ -46,10 +44,10 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; -import org.apache.hadoop.hdfs.server.datanode.LocalReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica; @@ -64,7 +62,6 @@ import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.Timer; import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.Files; /** * A block pool slice represents a portion of a block pool stored on a volume. @@ -96,6 +93,7 @@ class BlockPoolSlice { private final long cachedDfsUsedCheckTime; private final Timer timer; private final int maxDataLength; + private final FileIoProvider fileIoProvider; // TODO:FEDERATION scalability issue - a thread per DU is needed private final GetSpaceUsed dfsUsage; @@ -113,6 +111,7 @@ class BlockPoolSlice { Configuration conf, Timer timer) throws IOException { this.bpid = bpid; this.volume = volume; + this.fileIoProvider = volume.getFileIoProvider(); this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); this.finalizedDir = new File( currentDir, DataStorage.STORAGE_DIR_FINALIZED); @@ -147,19 +146,14 @@ class BlockPoolSlice { // this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP); if (tmpDir.exists()) { - DataStorage.fullyDelete(tmpDir); + fileIoProvider.fullyDelete(volume, tmpDir); } this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW); - if (!rbwDir.mkdirs()) { // create rbw directory if not exist - if (!rbwDir.isDirectory()) { - throw new IOException("Mkdirs failed to create " + rbwDir.toString()); - } - } - if (!tmpDir.mkdirs()) { - if (!tmpDir.isDirectory()) { - throw new IOException("Mkdirs failed to create " + tmpDir.toString()); - } - } + + // create the rbw and tmp directories if they don't exist. + fileIoProvider.mkdirs(volume, rbwDir); + fileIoProvider.mkdirs(volume, tmpDir); + // Use cached value initially if available. Or the following call will // block until the initial du command completes. this.dfsUsage = new CachingGetSpaceUsed.Builder().setPath(bpDir) @@ -266,7 +260,7 @@ class BlockPoolSlice { */ void saveDfsUsed() { File outFile = new File(currentDir, DU_CACHE_FILE); - if (outFile.exists() && !outFile.delete()) { + if (!fileIoProvider.deleteWithExistsCheck(volume, outFile)) { FsDatasetImpl.LOG.warn("Failed to delete old dfsUsed file in " + outFile.getParent()); } @@ -277,7 +271,7 @@ class BlockPoolSlice { new FileOutputStream(outFile), "UTF-8")) { // mtime is written last, so that truncated writes won't be valid. out.write(Long.toString(used) + " " + Long.toString(timer.now())); - out.flush(); + fileIoProvider.flush(volume, out); } } catch (IOException ioe) { // If write failed, the volume might be bad. Since the cache file is @@ -292,7 +286,8 @@ class BlockPoolSlice { */ File createTmpFile(Block b) throws IOException { File f = new File(tmpDir, b.getBlockName()); - File tmpFile = DatanodeUtil.createTmpFile(b, f); + File tmpFile = DatanodeUtil.createFileWithExistsCheck( + volume, b, f, fileIoProvider); // If any exception during creation, its expected that counter will not be // incremented, So no need to decrement incrNumBlocks(); @@ -305,7 +300,8 @@ class BlockPoolSlice { */ File createRbwFile(Block b) throws IOException { File f = new File(rbwDir, b.getBlockName()); - File rbwFile = DatanodeUtil.createTmpFile(b, f); + File rbwFile = DatanodeUtil.createFileWithExistsCheck( + volume, b, f, fileIoProvider); // If any exception during creation, its expected that counter will not be // incremented, So no need to decrement incrNumBlocks(); @@ -314,11 +310,7 @@ class BlockPoolSlice { File addFinalizedBlock(Block b, ReplicaInfo replicaInfo) throws IOException { File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId()); - if (!blockDir.exists()) { - if (!blockDir.mkdirs()) { - throw new IOException("Failed to mkdirs " + blockDir); - } - } + fileIoProvider.mkdirsWithExistsCheck(volume, blockDir); File blockFile = FsDatasetImpl.moveBlockFiles(b, replicaInfo, blockDir); File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp()); if (dfsUsage instanceof CachingGetSpaceUsed) { @@ -340,9 +332,9 @@ class BlockPoolSlice { final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId); final File targetBlockFile = new File(blockDir, blockFile.getName()); final File targetMetaFile = new File(blockDir, metaFile.getName()); - FileUtils.moveFile(blockFile, targetBlockFile); + fileIoProvider.moveFile(volume, blockFile, targetBlockFile); FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile); - FileUtils.moveFile(metaFile, targetMetaFile); + fileIoProvider.moveFile(volume, metaFile, targetMetaFile); FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile); ReplicaInfo newReplicaInfo = @@ -394,16 +386,13 @@ class BlockPoolSlice { File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp); if (blockFile.exists()) { // If the original block file still exists, then no recovery is needed. - if (!unlinkedTmp.delete()) { + if (!fileIoProvider.delete(volume, unlinkedTmp)) { throw new IOException("Unable to cleanup unlinked tmp file " + unlinkedTmp); } return null; } else { - if (!unlinkedTmp.renameTo(blockFile)) { - throw new IOException("Unable to rename unlinked tmp file " + - unlinkedTmp); - } + fileIoProvider.rename(volume, unlinkedTmp, blockFile); return blockFile; } } @@ -416,7 +405,7 @@ class BlockPoolSlice { */ private int moveLazyPersistReplicasToFinalized(File source) throws IOException { - File files[] = FileUtil.listFiles(source); + File[] files = fileIoProvider.listFiles(volume, source); int numRecovered = 0; for (File file : files) { if (file.isDirectory()) { @@ -431,24 +420,25 @@ class BlockPoolSlice { if (blockFile.exists()) { - if (!targetDir.exists() && !targetDir.mkdirs()) { + try { + fileIoProvider.mkdirsWithExistsCheck(volume, targetDir); + } catch(IOException ioe) { LOG.warn("Failed to mkdirs " + targetDir); continue; } final File targetMetaFile = new File(targetDir, metaFile.getName()); try { - LocalReplica.rename(metaFile, targetMetaFile); + fileIoProvider.rename(volume, metaFile, targetMetaFile); } catch (IOException e) { LOG.warn("Failed to move meta file from " + metaFile + " to " + targetMetaFile, e); continue; - } final File targetBlockFile = new File(targetDir, blockFile.getName()); try { - LocalReplica.rename(blockFile, targetBlockFile); + fileIoProvider.rename(volume, blockFile, targetBlockFile); } catch (IOException e) { LOG.warn("Failed to move block file from " + blockFile + " to " + targetBlockFile, e); @@ -465,7 +455,7 @@ class BlockPoolSlice { } } - FileUtil.fullyDelete(source); + fileIoProvider.fullyDelete(volume, source); return numRecovered; } @@ -508,7 +498,7 @@ class BlockPoolSlice { loadRwr = false; } sc.close(); - if (!restartMeta.delete()) { + if (!fileIoProvider.delete(volume, restartMeta)) { FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " + restartMeta.getPath()); } @@ -568,7 +558,7 @@ class BlockPoolSlice { final RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized) throws IOException { - File files[] = FileUtil.listFiles(dir); + File[] files = fileIoProvider.listFiles(volume, dir); for (File file : files) { if (file.isDirectory()) { addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized); @@ -581,8 +571,9 @@ class BlockPoolSlice { continue; } } - if (!Block.isBlockFilename(file)) + if (!Block.isBlockFilename(file)) { continue; + } long genStamp = FsDatasetUtil.getGenerationStampFromFile( files, file); @@ -700,7 +691,8 @@ class BlockPoolSlice { return 0; } try (DataInputStream checksumIn = new DataInputStream( - new BufferedInputStream(new FileInputStream(metaFile), + new BufferedInputStream( + fileIoProvider.getFileInputStream(volume, metaFile), ioFileBufferSize))) { // read and handle the common header here. For now just a version final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( @@ -713,9 +705,10 @@ class BlockPoolSlice { if (numChunks == 0) { return 0; } - try (InputStream blockIn = new FileInputStream(blockFile); + try (InputStream blockIn = fileIoProvider.getFileInputStream( + volume, blockFile); ReplicaInputStreams ris = new ReplicaInputStreams(blockIn, - checksumIn, volume.obtainReference())) { + checksumIn, volume.obtainReference(), fileIoProvider)) { ris.skipChecksumFully((numChunks - 1) * checksumSize); long lastChunkStartPos = (numChunks - 1) * bytesPerChecksum; ris.skipDataFully(lastChunkStartPos); @@ -734,7 +727,8 @@ class BlockPoolSlice { // truncate if extra bytes are present without CRC if (blockFile.length() > validFileLength) { try (RandomAccessFile blockRAF = - new RandomAccessFile(blockFile, "rw")) { + fileIoProvider.getRandomAccessFile( + volume, blockFile, "rw")) { // truncate blockFile blockRAF.setLength(validFileLength); } @@ -786,12 +780,14 @@ class BlockPoolSlice { } FileInputStream inputStream = null; try { - inputStream = new FileInputStream(replicaFile); + inputStream = fileIoProvider.getFileInputStream(volume, replicaFile); BlockListAsLongs blocksList = BlockListAsLongs.readFrom(inputStream, maxDataLength); - Iterator<BlockReportReplica> iterator = blocksList.iterator(); - while (iterator.hasNext()) { - BlockReportReplica replica = iterator.next(); + if (blocksList == null) { + return false; + } + + for (BlockReportReplica replica : blocksList) { switch (replica.getState()) { case FINALIZED: addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, true); @@ -828,7 +824,7 @@ class BlockPoolSlice { return false; } finally { - if (!replicaFile.delete()) { + if (!fileIoProvider.delete(volume, replicaFile)) { LOG.info("Failed to delete replica cache file: " + replicaFile.getPath()); } @@ -842,41 +838,29 @@ class BlockPoolSlice { blocksListToPersist.getNumberOfBlocks()== 0) { return; } - File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp"); - if (tmpFile.exists() && !tmpFile.delete()) { - LOG.warn("Failed to delete tmp replicas file in " + - tmpFile.getPath()); - return; - } - File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE); - if (replicaCacheFile.exists() && !replicaCacheFile.delete()) { - LOG.warn("Failed to delete replicas file in " + - replicaCacheFile.getPath()); + final File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp"); + final File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE); + if (!fileIoProvider.deleteWithExistsCheck(volume, tmpFile) || + !fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile)) { return; } FileOutputStream out = null; try { - out = new FileOutputStream(tmpFile); + out = fileIoProvider.getFileOutputStream(volume, tmpFile); blocksListToPersist.writeTo(out); out.close(); // Renaming the tmp file to replicas - Files.move(tmpFile, replicaCacheFile); + fileIoProvider.moveFile(volume, tmpFile, replicaCacheFile); } catch (Exception e) { // If write failed, the volume might be bad. Since the cache file is // not critical, log the error, delete both the files (tmp and cache) // and continue. LOG.warn("Failed to write replicas to cache ", e); - if (replicaCacheFile.exists() && !replicaCacheFile.delete()) { - LOG.warn("Failed to delete replicas file: " + - replicaCacheFile.getPath()); - } + fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile); } finally { IOUtils.closeStream(out); - if (tmpFile.exists() && !tmpFile.delete()) { - LOG.warn("Failed to delete tmp file in " + - tmpFile.getPath()); - } + fileIoProvider.deleteWithExistsCheck(volume, tmpFile); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 97dcf8d..416609d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -272,8 +272,10 @@ class FsDatasetAsyncDiskService { } File trashDirFile = new File(trashDirectory); - if (!trashDirFile.exists() && !trashDirFile.mkdirs()) { - LOG.error("Failed to create trash directory " + trashDirectory); + try { + volume.getFileIoProvider().mkdirsWithExistsCheck( + volume, trashDirFile); + } catch (IOException e) { return false; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 6065df2..35561cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -21,6 +21,7 @@ import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.EOFException; import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; @@ -57,6 +58,7 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -418,6 +420,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { .setDataset(this) .setStorageID(sd.getStorageUuid()) .setStorageDirectory(sd) + .setFileIoProvider(datanode.getFileIoProvider()) .setConf(this.conf) .build(); FsVolumeReference ref = fsVolume.obtainReference(); @@ -437,6 +440,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { .setDataset(this) .setStorageID(storageUuid) .setStorageDirectory(sd) + .setFileIoProvider(datanode.getFileIoProvider()) .setConf(conf) .build(); } @@ -818,7 +822,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { InputStream blockInStream = info.getDataInputStream(blkOffset); try { InputStream metaInStream = info.getMetadataInputStream(metaOffset); - return new ReplicaInputStreams(blockInStream, metaInStream, ref); + return new ReplicaInputStreams( + blockInStream, metaInStream, ref, datanode.getFileIoProvider()); } catch (IOException e) { IOUtils.cleanup(null, blockInStream); throw e; @@ -1027,9 +1032,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { static void computeChecksum(ReplicaInfo srcReplica, File dstMeta, int smallBufferSize, final Configuration conf) throws IOException { - File srcMeta = new File(srcReplica.getMetadataURI()); - final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta, - DFSUtilClient.getIoFileBufferSize(conf)); + final File srcMeta = new File(srcReplica.getMetadataURI()); + + DataChecksum checksum; + try (FileInputStream fis = + srcReplica.getFileIoProvider().getFileInputStream( + srcReplica.getVolume(), srcMeta)) { + checksum = BlockMetadataHeader.readDataChecksum( + fis, DFSUtilClient.getIoFileBufferSize(conf), srcMeta); + } + final byte[] data = new byte[1 << 16]; final byte[] crcs = new byte[checksum.getChecksumSize(data.length)]; @@ -2161,16 +2173,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { return; } - final long diskGS = diskMetaFile != null && diskMetaFile.exists() ? + final FileIoProvider fileIoProvider = datanode.getFileIoProvider(); + final boolean diskMetaFileExists = diskMetaFile != null && + fileIoProvider.exists(vol, diskMetaFile); + final boolean diskFileExists = diskFile != null && + fileIoProvider.exists(vol, diskFile); + + final long diskGS = diskMetaFileExists ? Block.getGenerationStamp(diskMetaFile.getName()) : - HdfsConstants.GRANDFATHER_GENERATION_STAMP; + HdfsConstants.GRANDFATHER_GENERATION_STAMP; - if (diskFile == null || !diskFile.exists()) { + if (!diskFileExists) { if (memBlockInfo == null) { // Block file does not exist and block does not exist in memory // If metadata file exists then delete it - if (diskMetaFile != null && diskMetaFile.exists() - && diskMetaFile.delete()) { + if (diskMetaFileExists && fileIoProvider.delete(vol, diskMetaFile)) { LOG.warn("Deleted a metadata file without a block " + diskMetaFile.getAbsolutePath()); } @@ -2186,8 +2203,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { LOG.warn("Removed block " + blockId + " from memory with missing block file on the disk"); // Finally remove the metadata file - if (diskMetaFile != null && diskMetaFile.exists() - && diskMetaFile.delete()) { + if (diskMetaFileExists && fileIoProvider.delete(vol, diskMetaFile)) { LOG.warn("Deleted a metadata file for the deleted block " + diskMetaFile.getAbsolutePath()); } @@ -2223,7 +2239,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // Compare block files if (memBlockInfo.blockDataExists()) { if (memBlockInfo.getBlockURI().compareTo(diskFile.toURI()) != 0) { - if (diskMetaFile.exists()) { + if (diskMetaFileExists) { if (memBlockInfo.metadataExists()) { // We have two sets of block+meta files. Decide which one to // keep. @@ -2239,7 +2255,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { memBlockInfo, diskBlockInfo, volumeMap); } } else { - if (!diskFile.delete()) { + if (!fileIoProvider.delete(vol, diskFile)) { LOG.warn("Failed to delete " + diskFile); } } @@ -2278,8 +2294,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // as the block file, then use the generation stamp from it try { File memFile = new File(memBlockInfo.getBlockURI()); - long gs = diskMetaFile != null && diskMetaFile.exists() - && diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS + long gs = diskMetaFileExists && + diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS : HdfsConstants.GRANDFATHER_GENERATION_STAMP; LOG.warn("Updating generation stamp for block " + blockId http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java index 563f66a..32759c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; +import java.io.FileDescriptor; import java.io.FileInputStream; import java.io.FilenameFilter; import java.io.IOException; @@ -80,7 +81,7 @@ public class FsDatasetUtil { return matches[0]; } - public static FileInputStream openAndSeek(File file, long offset) + public static FileDescriptor openAndSeek(File file, long offset) throws IOException { RandomAccessFile raf = null; try { @@ -88,7 +89,7 @@ public class FsDatasetUtil { if (offset > 0) { raf.seek(offset); } - return new FileInputStream(raf.getFD()); + return raf.getFD(); } catch(IOException ioe) { IOUtils.cleanup(null, raf); throw ioe; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index c317715..74ee063 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -19,14 +19,12 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.BufferedWriter; import java.io.File; -import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.RandomAccessFile; import java.net.URI; import java.nio.channels.ClosedChannelException; -import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.Collections; @@ -46,8 +44,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DF; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -75,7 +73,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.CloseableReferenceCount; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.StringUtils; @@ -132,6 +129,7 @@ public class FsVolumeImpl implements FsVolumeSpi { // limit the visible capacity for tests. If negative, then we just // query from the filesystem. protected volatile long configuredCapacity; + private final FileIoProvider fileIoProvider; /** * Per-volume worker pool that processes new blocks to cache. @@ -141,8 +139,9 @@ public class FsVolumeImpl implements FsVolumeSpi { */ protected ThreadPoolExecutor cacheExecutor; - FsVolumeImpl(FsDatasetImpl dataset, String storageID, StorageDirectory sd, - Configuration conf) throws IOException { + FsVolumeImpl( + FsDatasetImpl dataset, String storageID, StorageDirectory sd, + FileIoProvider fileIoProvider, Configuration conf) throws IOException { if (sd.getStorageLocation() == null) { throw new IOException("StorageLocation specified for storage directory " + @@ -162,6 +161,7 @@ public class FsVolumeImpl implements FsVolumeSpi { DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT)); this.configuredCapacity = -1; this.conf = conf; + this.fileIoProvider = fileIoProvider; cacheExecutor = initializeCacheExecutor(parent); } @@ -664,8 +664,8 @@ public class FsVolumeImpl implements FsVolumeSpi { */ private String getNextSubDir(String prev, File dir) throws IOException { - List<String> children = - IOUtils.listDirectory(dir, SubdirFilter.INSTANCE); + List<String> children = fileIoProvider.listDirectory( + FsVolumeImpl.this, dir, SubdirFilter.INSTANCE); cache = null; cacheMs = 0; if (children.size() == 0) { @@ -718,8 +718,8 @@ public class FsVolumeImpl implements FsVolumeSpi { } File dir = Paths.get(bpidDir.getAbsolutePath(), "current", "finalized", state.curFinalizedDir, state.curFinalizedSubDir).toFile(); - List<String> entries = - IOUtils.listDirectory(dir, BlockFileFilter.INSTANCE); + List<String> entries = fileIoProvider.listDirectory( + FsVolumeImpl.this, dir, BlockFileFilter.INSTANCE); if (entries.size() == 0) { entries = null; } else { @@ -839,19 +839,18 @@ public class FsVolumeImpl implements FsVolumeSpi { public void save() throws IOException { state.lastSavedMs = Time.now(); boolean success = false; - try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter( - new FileOutputStream(getTempSaveFile(), false), "UTF-8"))) { + try (BufferedWriter writer = new BufferedWriter( + new OutputStreamWriter(fileIoProvider.getFileOutputStream( + FsVolumeImpl.this, getTempSaveFile()), "UTF-8"))) { WRITER.writeValue(writer, state); success = true; } finally { if (!success) { - if (getTempSaveFile().delete()) { - LOG.debug("save({}, {}): error deleting temporary file.", - storageID, bpid); - } + fileIoProvider.delete(FsVolumeImpl.this, getTempSaveFile()); } } - Files.move(getTempSaveFile().toPath(), getSaveFile().toPath(), + fileIoProvider.move(FsVolumeImpl.this, + getTempSaveFile().toPath(), getSaveFile().toPath(), StandardCopyOption.ATOMIC_MOVE); if (LOG.isTraceEnabled()) { LOG.trace("save({}, {}): saved {}", storageID, bpid, @@ -1042,11 +1041,12 @@ public class FsVolumeImpl implements FsVolumeSpi { File finalizedDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_FINALIZED); File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW); - if (finalizedDir.exists() && !DatanodeUtil.dirNoFilesRecursive( - finalizedDir)) { + if (fileIoProvider.exists(this, finalizedDir) && + !DatanodeUtil.dirNoFilesRecursive(this, finalizedDir, fileIoProvider)) { return false; } - if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) { + if (fileIoProvider.exists(this, rbwDir) && + fileIoProvider.list(this, rbwDir).length != 0) { return false; } return true; @@ -1067,35 +1067,38 @@ public class FsVolumeImpl implements FsVolumeSpi { DataStorage.STORAGE_DIR_LAZY_PERSIST); File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW); if (force) { - DataStorage.fullyDelete(bpDir); + fileIoProvider.fullyDelete(this, bpDir); } else { - if (!rbwDir.delete()) { + if (!fileIoProvider.delete(this, rbwDir)) { throw new IOException("Failed to delete " + rbwDir); } - if (!DatanodeUtil.dirNoFilesRecursive(finalizedDir) || - !FileUtil.fullyDelete(finalizedDir)) { + if (!DatanodeUtil.dirNoFilesRecursive( + this, finalizedDir, fileIoProvider) || + !fileIoProvider.fullyDelete( + this, finalizedDir)) { throw new IOException("Failed to delete " + finalizedDir); } if (lazypersistDir.exists() && - ((!DatanodeUtil.dirNoFilesRecursive(lazypersistDir) || - !FileUtil.fullyDelete(lazypersistDir)))) { + ((!DatanodeUtil.dirNoFilesRecursive( + this, lazypersistDir, fileIoProvider) || + !fileIoProvider.fullyDelete(this, lazypersistDir)))) { throw new IOException("Failed to delete " + lazypersistDir); } - DataStorage.fullyDelete(tmpDir); - for (File f : FileUtil.listFiles(bpCurrentDir)) { - if (!f.delete()) { + fileIoProvider.fullyDelete(this, tmpDir); + for (File f : fileIoProvider.listFiles(this, bpCurrentDir)) { + if (!fileIoProvider.delete(this, f)) { throw new IOException("Failed to delete " + f); } } - if (!bpCurrentDir.delete()) { + if (!fileIoProvider.delete(this, bpCurrentDir)) { throw new IOException("Failed to delete " + bpCurrentDir); } - for (File f : FileUtil.listFiles(bpDir)) { - if (!f.delete()) { + for (File f : fileIoProvider.listFiles(this, bpDir)) { + if (!fileIoProvider.delete(this, f)) { throw new IOException("Failed to delete " + f); } } - if (!bpDir.delete()) { + if (!fileIoProvider.delete(this, bpDir)) { throw new IOException("Failed to delete " + bpDir); } } @@ -1118,7 +1121,10 @@ public class FsVolumeImpl implements FsVolumeSpi { private byte[] loadLastPartialChunkChecksum( File blockFile, File metaFile) throws IOException { - DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); + // readHeader closes the temporary FileInputStream. + DataChecksum dcs = BlockMetadataHeader + .readHeader(fileIoProvider.getFileInputStream(this, metaFile)) + .getChecksum(); final int checksumSize = dcs.getChecksumSize(); final long onDiskLen = blockFile.length(); final int bytesPerChecksum = dcs.getBytesPerChecksum(); @@ -1132,7 +1138,8 @@ public class FsVolumeImpl implements FsVolumeSpi { int offsetInChecksum = BlockMetadataHeader.getHeaderSize() + (int)(onDiskLen / bytesPerChecksum * checksumSize); byte[] lastChecksum = new byte[checksumSize]; - try (RandomAccessFile raf = new RandomAccessFile(metaFile, "r")) { + try (RandomAccessFile raf = fileIoProvider.getRandomAccessFile( + this, metaFile, "r")) { raf.seek(offsetInChecksum); raf.read(lastChecksum, 0, checksumSize); } @@ -1246,8 +1253,8 @@ public class FsVolumeImpl implements FsVolumeSpi { copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId); File blockFile = copiedReplicaFiles[1]; File metaFile = copiedReplicaFiles[0]; - LocalReplica.truncateBlock(blockFile, metaFile, - rur.getNumBytes(), newlength); + LocalReplica.truncateBlock(rur.getVolume(), blockFile, metaFile, + rur.getNumBytes(), newlength, fileIoProvider); LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW) .setBlockId(newBlockId) @@ -1283,6 +1290,11 @@ public class FsVolumeImpl implements FsVolumeSpi { getFinalizedDir(bpid), report, reportCompiler); } + @Override + public FileIoProvider getFileIoProvider() { + return fileIoProvider; + } + private LinkedList<ScanInfo> compileReport(File bpFinalizedDir, File dir, LinkedList<ScanInfo> report, ReportCompiler reportCompiler) throws InterruptedException { @@ -1291,7 +1303,8 @@ public class FsVolumeImpl implements FsVolumeSpi { List <String> fileNames; try { - fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE); + fileNames = fileIoProvider.listDirectory( + this, dir, BlockDirFilter.INSTANCE); } catch (IOException ioe) { LOG.warn("Exception occured while compiling report: ", ioe); // Initiate a check on disk failure. http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java index a1f7e91..5371eda 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; /** * This class is to be used as a builder for {@link FsVolumeImpl} objects. @@ -31,6 +32,7 @@ public class FsVolumeImplBuilder { private String storageID; private StorageDirectory sd; private Configuration conf; + private FileIoProvider fileIoProvider; public FsVolumeImplBuilder() { dataset = null; @@ -59,7 +61,15 @@ public class FsVolumeImplBuilder { return this; } + FsVolumeImplBuilder setFileIoProvider(FileIoProvider fileIoProvider) { + this.fileIoProvider = fileIoProvider; + return this; + } + FsVolumeImpl build() throws IOException { - return new FsVolumeImpl(dataset, storageID, sd, conf); + return new FsVolumeImpl( + dataset, storageID, sd, + fileIoProvider != null ? fileIoProvider : new FileIoProvider(null), + conf); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index e963d41..20cec6a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -701,7 +701,7 @@ public class TestFileAppend{ ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaHandler.getReplica(); ReplicaOutputStreams - outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM, 300); + outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM); OutputStream dataOutput = outputStreams.getDataOut(); byte[] appendBytes = new byte[1]; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index ae52905..a0041dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -122,6 +122,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { static final byte[] nullCrcFileData; private final AutoCloseableLock datasetLock; + private final FileIoProvider fileIoProvider; static { DataChecksum checksum = DataChecksum.newDataChecksum( @@ -260,7 +261,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { @Override synchronized public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum, long slowLogThresholdMs) + DataChecksum requestedChecksum) throws IOException { if (finalized) { throw new IOException("Trying to write to a finalized replica " @@ -268,7 +269,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { } else { SimulatedOutputStream crcStream = new SimulatedOutputStream(); return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum, - volume.isTransientStorage(), slowLogThresholdMs); + volume, fileIoProvider); } } @@ -474,9 +475,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { static class SimulatedVolume implements FsVolumeSpi { private final SimulatedStorage storage; + private final FileIoProvider fileIoProvider; - SimulatedVolume(final SimulatedStorage storage) { + SimulatedVolume(final SimulatedStorage storage, + final FileIoProvider fileIoProvider) { this.storage = storage; + this.fileIoProvider = fileIoProvider; } @Override @@ -560,6 +564,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { } @Override + public FileIoProvider getFileIoProvider() { + return fileIoProvider; + } + + @Override public VolumeCheckResult check(VolumeCheckContext context) throws Exception { return VolumeCheckResult.HEALTHY; @@ -590,10 +599,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { } registerMBean(datanodeUuid); + this.fileIoProvider = new FileIoProvider(conf); this.storage = new SimulatedStorage( conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY), conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE)); - this.volume = new SimulatedVolume(this.storage); + this.volume = new SimulatedVolume(this.storage, this.fileIoProvider); this.datasetLock = new AutoCloseableLock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 8439991..619eda0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -673,7 +673,7 @@ public class TestBlockRecovery { ReplicaOutputStreams streams = null; try { streams = replicaInfo.createStreams(true, - DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300); + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); streams.getChecksumOut().write('a'); dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1)); BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index d7c8383..cc0915d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -905,6 +905,11 @@ public class TestDirectoryScanner { return null; } + @Override + public FileIoProvider getFileIoProvider() { + return null; + } + @Override public VolumeCheckResult check(VolumeCheckContext context) http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index fa980c2..4e724bc7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -83,7 +83,7 @@ public class TestSimulatedFSDataset { ReplicaInPipeline bInfo = fsdataset.createRbw( StorageType.DEFAULT, b, false).getReplica(); ReplicaOutputStreams out = bInfo.createStreams(true, - DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300); + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); try { OutputStream dataOut = out.getDataOut(); assertEquals(0, fsdataset.getLength(b)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 2417c9d..5cd86e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -134,7 +134,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> { @Override public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, long ckoff) throws IOException { - return new ReplicaInputStreams(null, null, null); + return new ReplicaInputStreams(null, null, null, null); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java index 6fa2830..5c172e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java @@ -58,10 +58,10 @@ public class ExternalReplicaInPipeline implements ReplicaInPipeline { @Override public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum, long slowLogThresholdMs) + DataChecksum requestedChecksum) throws IOException { - return new ReplicaOutputStreams(null, null, requestedChecksum, false, - slowLogThresholdMs); + return new ReplicaOutputStreams(null, null, requestedChecksum, + null, null); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java index 2753a61..e607de5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; @@ -115,6 +116,11 @@ public class ExternalVolumeImpl implements FsVolumeSpi { } @Override + public FileIoProvider getFileIoProvider() { + return null; + } + + @Override public VolumeCheckResult check(VolumeCheckContext context) throws Exception { return VolumeCheckResult.HEALTHY; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java index a089d39..3bac7b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java @@ -99,6 +99,8 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase { .add(DFSConfigKeys.DFS_DATANODE_STARTUP_KEY); configurationPropsToSkipCompare .add(DFSConfigKeys.DFS_NAMENODE_STARTUP_KEY); + configurationPropsToSkipCompare + .add(DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY); // Allocate xmlPropsToSkipCompare = new HashSet<String>(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
