HDFS-10958. Add instrumentation hooks around Datanode disk IO.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6ba9587d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6ba9587d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6ba9587d Branch: refs/heads/YARN-5085 Commit: 6ba9587d370fbf39c129c08c00ebbb894ccc1389 Parents: 72bff19 Author: Arpit Agarwal <[email protected]> Authored: Wed Dec 14 11:18:58 2016 -0800 Committer: Arpit Agarwal <[email protected]> Committed: Wed Dec 14 11:18:58 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/io/nativeio/NativeIO.java | 40 +- .../server/datanode/BlockMetadataHeader.java | 29 +- .../dev-support/findbugsExcludeFile.xml | 27 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../hdfs/server/datanode/BlockReceiver.java | 14 +- .../hdfs/server/datanode/BlockSender.java | 10 +- .../server/datanode/CountingFileIoEvents.java | 107 ++ .../hadoop/hdfs/server/datanode/DataNode.java | 12 + .../hdfs/server/datanode/DataNodeMXBean.java | 5 + .../hdfs/server/datanode/DataStorage.java | 6 + .../hdfs/server/datanode/DatanodeUtil.java | 19 +- .../server/datanode/DefaultFileIoEvents.java | 67 ++ .../hdfs/server/datanode/FileIoEvents.java | 97 ++ .../hdfs/server/datanode/FileIoProvider.java | 1006 ++++++++++++++++++ .../hdfs/server/datanode/LocalReplica.java | 133 +-- .../server/datanode/LocalReplicaInPipeline.java | 43 +- .../hdfs/server/datanode/ReplicaInPipeline.java | 4 +- .../hdfs/server/datanode/ReplicaInfo.java | 17 +- .../server/datanode/fsdataset/FsVolumeSpi.java | 3 + .../datanode/fsdataset/ReplicaInputStreams.java | 11 +- .../fsdataset/ReplicaOutputStreams.java | 72 +- .../datanode/fsdataset/impl/BlockPoolSlice.java | 122 +-- .../impl/FsDatasetAsyncDiskService.java | 6 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 46 +- .../datanode/fsdataset/impl/FsDatasetUtil.java | 5 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 89 +- .../fsdataset/impl/FsVolumeImplBuilder.java | 12 +- .../org/apache/hadoop/hdfs/TestFileAppend.java | 2 +- .../server/datanode/SimulatedFSDataset.java | 18 +- .../hdfs/server/datanode/TestBlockRecovery.java | 2 +- .../server/datanode/TestDirectoryScanner.java | 5 + .../server/datanode/TestSimulatedFSDataset.java | 2 +- .../extdataset/ExternalDatasetImpl.java | 2 +- .../extdataset/ExternalReplicaInPipeline.java | 6 +- .../datanode/extdataset/ExternalVolumeImpl.java | 6 + .../hadoop/tools/TestHdfsConfigFields.java | 2 + 36 files changed, 1684 insertions(+), 365 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java index a123f18..f3ff1c7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java @@ -742,47 +742,19 @@ public class NativeIO { } /** - * Create a FileInputStream that shares delete permission on the - * file opened, i.e. other process can delete the file the - * FileInputStream is reading. Only Windows implementation uses - * the native interface. - */ - public static FileInputStream getShareDeleteFileInputStream(File f) - throws IOException { - if (!Shell.WINDOWS) { - // On Linux the default FileInputStream shares delete permission - // on the file opened. - // - return new FileInputStream(f); - } else { - // Use Windows native interface to create a FileInputStream that - // shares delete permission on the file opened. - // - FileDescriptor fd = Windows.createFile( - f.getAbsolutePath(), - Windows.GENERIC_READ, - Windows.FILE_SHARE_READ | - Windows.FILE_SHARE_WRITE | - Windows.FILE_SHARE_DELETE, - Windows.OPEN_EXISTING); - return new FileInputStream(fd); - } - } - - /** - * Create a FileInputStream that shares delete permission on the + * Create a FileDescriptor that shares delete permission on the * file opened at a given offset, i.e. other process can delete - * the file the FileInputStream is reading. Only Windows implementation + * the file the FileDescriptor is reading. Only Windows implementation * uses the native interface. */ - public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset) - throws IOException { + public static FileDescriptor getShareDeleteFileDescriptor( + File f, long seekOffset) throws IOException { if (!Shell.WINDOWS) { RandomAccessFile rf = new RandomAccessFile(f, "r"); if (seekOffset > 0) { rf.seek(seekOffset); } - return new FileInputStream(rf.getFD()); + return rf.getFD(); } else { // Use Windows native interface to create a FileInputStream that // shares delete permission on the file opened, and set it to the @@ -797,7 +769,7 @@ public class NativeIO { NativeIO.Windows.OPEN_EXISTING); if (seekOffset > 0) NativeIO.Windows.setFilePointer(fd, seekOffset, NativeIO.Windows.FILE_BEGIN); - return new FileInputStream(fd); + return fd; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java index eb19492..738f496 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java @@ -31,7 +31,6 @@ import java.nio.channels.FileChannel; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; import com.google.common.annotations.VisibleForTesting; @@ -79,18 +78,15 @@ public class BlockMetadataHeader { /** * Read the checksum header from the meta file. + * inputStream must be closed by the caller. * @return the data checksum obtained from the header. */ - public static DataChecksum readDataChecksum(File metaFile, int bufSize) + public static DataChecksum readDataChecksum( + FileInputStream inputStream, int bufSize, File metaFile) throws IOException { - DataInputStream in = null; - try { - in = new DataInputStream(new BufferedInputStream( - new FileInputStream(metaFile), bufSize)); - return readDataChecksum(in, metaFile); - } finally { - IOUtils.closeStream(in); - } + DataInputStream in = new DataInputStream(new BufferedInputStream( + inputStream, bufSize)); + return readDataChecksum(in, metaFile); } /** @@ -111,6 +107,7 @@ public class BlockMetadataHeader { /** * Read the header without changing the position of the FileChannel. + * This is used by the client for short-circuit reads. * * @param fc The FileChannel to read. * @return the Metadata Header. @@ -144,18 +141,16 @@ public class BlockMetadataHeader { /** * Reads header at the top of metadata file and returns the header. + * Closes the input stream after reading the header. * * @return metadata header for the block * @throws IOException */ - public static BlockMetadataHeader readHeader(File file) throws IOException { - DataInputStream in = null; - try { - in = new DataInputStream(new BufferedInputStream( - new FileInputStream(file))); + public static BlockMetadataHeader readHeader( + FileInputStream fis) throws IOException { + try (DataInputStream in = new DataInputStream( + new BufferedInputStream(fis))) { return readHeader(in); - } finally { - IOUtils.closeStream(in); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml index e6e4057..3fa4e8d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml @@ -74,6 +74,33 @@ </Match> <!-- + This class exposes stream constructors. The newly created streams are not + supposed to be closed in the constructor. Ignore the OBL warning. + --> + <Match> + <Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedFileOutputStream" /> + <Bug pattern="OBL_UNSATISFIED_OBLIGATION" /> + </Match> + + <!-- + This class exposes stream constructors. The newly created streams are not + supposed to be closed in the constructor. Ignore the OBL warning. + --> + <Match> + <Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedFileInputStream" /> + <Bug pattern="OBL_UNSATISFIED_OBLIGATION" /> + </Match> + + <!-- + This class exposes stream constructors. The newly created streams are not + supposed to be closed in the constructor. Ignore the OBL warning. + --> + <Match> + <Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedRandomAccessFile" /> + <Bug pattern="OBL_UNSATISFIED_OBLIGATION" /> + </Match> + + <!-- lastAppliedTxid is carefully unsynchronized in the BackupNode in a couple spots. See the comments in BackupImage for justification. --> http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index df21857..cffc4bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -687,6 +687,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins"; public static final String DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory"; public static final String DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY = "dfs.datanode.fsdataset.volume.choosing.policy"; + public static final String DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY = + "dfs.datanode.fileio.events.class"; public static final String DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold"; public static final long DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT = 1024L * 1024L * 1024L * 10L; // 10 GB public static final String DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index f372072..441bd91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -244,8 +244,7 @@ class BlockReceiver implements Closeable { final boolean isCreate = isDatanode || isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE; - streams = replicaInfo.createStreams(isCreate, requestedChecksum, - datanodeSlowLogThresholdMs); + streams = replicaInfo.createStreams(isCreate, requestedChecksum); assert streams != null : "null streams!"; // read checksum meta information @@ -400,9 +399,8 @@ class BlockReceiver implements Closeable { checksumOut.flush(); long flushEndNanos = System.nanoTime(); if (isSync) { - long fsyncStartNanos = flushEndNanos; streams.syncChecksumOut(); - datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos); + datanode.metrics.addFsyncNanos(System.nanoTime() - flushEndNanos); } flushTotalNanos += flushEndNanos - flushStartNanos; } @@ -703,8 +701,10 @@ class BlockReceiver implements Closeable { int numBytesToDisk = (int)(offsetInBlock-onDiskLen); // Write data to disk. - long duration = streams.writeToDisk(dataBuf.array(), + long begin = Time.monotonicNow(); + streams.writeDataToDisk(dataBuf.array(), startByteToDisk, numBytesToDisk); + long duration = Time.monotonicNow() - begin; if (duration > maxWriteToDiskMs) { maxWriteToDiskMs = duration; @@ -1029,9 +1029,7 @@ class BlockReceiver implements Closeable { * will be overwritten. */ private void adjustCrcFilePosition() throws IOException { - if (streams.getDataOut() != null) { - streams.flushDataOut(); - } + streams.flushDataOut(); if (checksumOut != null) { checksumOut.flush(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 9182c88..d7aebd8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -166,6 +166,7 @@ class BlockSender implements java.io.Closeable { private final boolean dropCacheBehindAllReads; private long lastCacheDropOffset; + private final FileIoProvider fileIoProvider; @VisibleForTesting static long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB @@ -197,6 +198,7 @@ class BlockSender implements java.io.Closeable { InputStream blockIn = null; DataInputStream checksumIn = null; FsVolumeReference volumeRef = null; + this.fileIoProvider = datanode.getFileIoProvider(); try { this.block = block; this.corruptChecksumOk = corruptChecksumOk; @@ -401,7 +403,8 @@ class BlockSender implements java.io.Closeable { DataNode.LOG.debug("replica=" + replica); } blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset - ris = new ReplicaInputStreams(blockIn, checksumIn, volumeRef); + ris = new ReplicaInputStreams( + blockIn, checksumIn, volumeRef, fileIoProvider); } catch (IOException ioe) { IOUtils.closeStream(this); throw ioe; @@ -568,8 +571,9 @@ class BlockSender implements java.io.Closeable { FileChannel fileCh = ((FileInputStream)ris.getDataIn()).getChannel(); LongWritable waitTime = new LongWritable(); LongWritable transferTime = new LongWritable(); - sockOut.transferToFully(fileCh, blockInPosition, dataLen, - waitTime, transferTime); + fileIoProvider.transferToSocketFully( + ris.getVolumeRef().getVolume(), sockOut, fileCh, blockInPosition, + dataLen, waitTime, transferTime); datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get()); datanode.metrics.addSendDataPacketTransferNanos(transferTime.get()); blockInPosition += dataLen; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java new file mode 100644 index 0000000..a70c151 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * {@link FileIoEvents} that simply counts the number of operations. + * Not meant to be used outside of testing. + */ [email protected] [email protected] +public class CountingFileIoEvents implements FileIoEvents { + private final Map<OPERATION, Counts> counts; + + private static class Counts { + private final AtomicLong successes = new AtomicLong(0); + private final AtomicLong failures = new AtomicLong(0); + + @JsonProperty("Successes") + public long getSuccesses() { + return successes.get(); + } + + @JsonProperty("Failures") + public long getFailures() { + return failures.get(); + } + } + + public CountingFileIoEvents() { + counts = new HashMap<>(); + for (OPERATION op : OPERATION.values()) { + counts.put(op, new Counts()); + } + } + + @Override + public long beforeMetadataOp( + @Nullable FsVolumeSpi volume, OPERATION op) { + return 0; + } + + @Override + public void afterMetadataOp( + @Nullable FsVolumeSpi volume, OPERATION op, long begin) { + counts.get(op).successes.incrementAndGet(); + } + + @Override + public long beforeFileIo( + @Nullable FsVolumeSpi volume, OPERATION op, long len) { + return 0; + } + + @Override + public void afterFileIo( + @Nullable FsVolumeSpi volume, OPERATION op, long begin, long len) { + counts.get(op).successes.incrementAndGet(); + } + + @Override + public void onFailure( + @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) { + counts.get(op).failures.incrementAndGet(); + + } + + @Override + public String getStatistics() { + ObjectMapper objectMapper = new ObjectMapper(); + try { + return objectMapper.writeValueAsString(counts); + } catch (JsonProcessingException e) { + // Failed to serialize. Don't log the exception call stack. + FileIoProvider.LOG.error("Failed to serialize statistics" + e); + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index b845da0..794b1ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -299,6 +299,7 @@ public class DataNode extends ReconfigurableBase public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog"); private static final String DATANODE_HTRACE_PREFIX = "datanode.htrace."; + private final FileIoProvider fileIoProvider; /** * Use {@link NetUtils#createSocketAddr(String)} instead. @@ -411,6 +412,7 @@ public class DataNode extends ReconfigurableBase this.tracer = createTracer(conf); this.tracerConfigurationManager = new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf); + this.fileIoProvider = new FileIoProvider(conf); this.fileDescriptorPassingDisabledReason = null; this.maxNumberOfBlocksToLog = 0; this.confVersion = null; @@ -437,6 +439,7 @@ public class DataNode extends ReconfigurableBase this.tracer = createTracer(conf); this.tracerConfigurationManager = new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf); + this.fileIoProvider = new FileIoProvider(conf); this.blockScanner = new BlockScanner(this); this.lastDiskErrorCheck = 0; this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, @@ -617,6 +620,10 @@ public class DataNode extends ReconfigurableBase PipelineAck.ECN.SUPPORTED; } + public FileIoProvider getFileIoProvider() { + return fileIoProvider; + } + /** * Contains the StorageLocations for changed data volumes. */ @@ -3008,6 +3015,11 @@ public class DataNode extends ReconfigurableBase } } + @Override // DataNodeMXBean + public String getFileIoProviderStatistics() { + return fileIoProvider.getStatistics(); + } + public void refreshNamenodes(Configuration conf) throws IOException { blockPoolManager.refreshNamenodes(conf); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java index 90c38d7..37f9635 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java @@ -120,4 +120,9 @@ public interface DataNodeMXBean { * @return DiskBalancer Status */ String getDiskBalancerStatus(); + + /** + * Gets the {@link FileIoProvider} statistics. + */ + String getFileIoProviderStatistics(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index f4deb6d..5163e6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -1356,6 +1356,12 @@ public class DataStorage extends Storage { bpStorageMap.remove(bpId); } + /** + * Prefer FileIoProvider#fullydelete. + * @param dir + * @return + */ + @Deprecated public static boolean fullyDelete(final File dir) { boolean result = FileUtil.fullyDelete(dir); return result; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java index ad054a8..c98ff54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; /** Provide utility methods for Datanode. */ @@ -55,15 +56,17 @@ public class DatanodeUtil { * @throws IOException * if the file already exists or if the file cannot be created. */ - public static File createTmpFile(Block b, File f) throws IOException { - if (f.exists()) { + public static File createFileWithExistsCheck( + FsVolumeSpi volume, Block b, File f, + FileIoProvider fileIoProvider) throws IOException { + if (fileIoProvider.exists(volume, f)) { throw new IOException("Failed to create temporary file for " + b + ". File " + f + " should not be present, but is."); } // Create the zero-length temp file final boolean fileCreated; try { - fileCreated = f.createNewFile(); + fileCreated = fileIoProvider.createFile(volume, f); } catch (IOException ioe) { throw new IOException(DISK_ERROR + "Failed to create " + f, ioe); } @@ -92,13 +95,17 @@ public class DatanodeUtil { * @return true if there are no files * @throws IOException if unable to list subdirectories */ - public static boolean dirNoFilesRecursive(File dir) throws IOException { - File[] contents = dir.listFiles(); + public static boolean dirNoFilesRecursive( + FsVolumeSpi volume, File dir, + FileIoProvider fileIoProvider) throws IOException { + File[] contents = fileIoProvider.listFiles(volume, dir); if (contents == null) { throw new IOException("Cannot list contents of " + dir); } for (File f : contents) { - if (!f.isDirectory() || (f.isDirectory() && !dirNoFilesRecursive(f))) { + if (!f.isDirectory() || + (f.isDirectory() && !dirNoFilesRecursive( + volume, f, fileIoProvider))) { return false; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java new file mode 100644 index 0000000..bd4932b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; + +import javax.annotation.Nullable; + +/** + * The default implementation of {@link FileIoEvents} that do nothing. + */ [email protected] [email protected] +public final class DefaultFileIoEvents implements FileIoEvents { + @Override + public long beforeMetadataOp( + @Nullable FsVolumeSpi volume, OPERATION op) { + return 0; + } + + @Override + public void afterMetadataOp( + @Nullable FsVolumeSpi volume, OPERATION op, long begin) { + } + + @Override + public long beforeFileIo( + @Nullable FsVolumeSpi volume, OPERATION op, long len) { + return 0; + } + + @Override + public void afterFileIo( + @Nullable FsVolumeSpi volume, OPERATION op, long begin, long len) { + } + + @Override + public void onFailure( + @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) { + } + + @Override + public @Nullable String getStatistics() { + // null is valid JSON. + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java new file mode 100644 index 0000000..48e703f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; + +import javax.annotation.Nullable; + +/** + * The following hooks can be implemented for instrumentation/fault + * injection. + */ [email protected] [email protected] +public interface FileIoEvents { + + /** + * Invoked before a filesystem metadata operation. + * + * @param volume target volume for the operation. Null if unavailable. + * @param op type of operation. + * @return timestamp at which the operation was started. 0 if + * unavailable. + */ + long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op); + + /** + * Invoked after a filesystem metadata operation has completed. + * + * @param volume target volume for the operation. Null if unavailable. + * @param op type of operation. + * @param begin timestamp at which the operation was started. 0 + * if unavailable. + */ + void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op, long begin); + + /** + * Invoked before a read/write/flush/channel transfer operation. + * + * @param volume target volume for the operation. Null if unavailable. + * @param op type of operation. + * @param len length of the file IO. 0 for flush. + * @return timestamp at which the operation was started. 0 if + * unavailable. + */ + long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op, long len); + + + /** + * Invoked after a read/write/flush/channel transfer operation + * has completed. + * + * @param volume target volume for the operation. Null if unavailable. + * @param op type of operation. + * @param len of the file IO. 0 for flush. + * @return timestamp at which the operation was started. 0 if + * unavailable. + */ + void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op, + long begin, long len); + + /** + * Invoked if an operation fails with an exception. + * @param volume target volume for the operation. Null if unavailable. + * @param op type of operation. + * @param e Exception encountered during the operation. + * @param begin time at which the operation was started. + */ + void onFailure( + @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin); + + /** + * Return statistics as a JSON string. + * @return + */ + @Nullable String getStatistics(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java new file mode 100644 index 0000000..2344114 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java @@ -0,0 +1,1006 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.HardLink; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIOException; +import org.apache.hadoop.net.SocketOutputStream; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.File; +import java.io.FileDescriptor; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.Flushable; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.file.CopyOption; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.*; + +/** + * This class abstracts out various file IO operations performed by the + * DataNode and invokes event hooks before and after each file IO. + * + * Behavior can be injected into these events by implementing + * {@link FileIoEvents} and replacing the default implementation + * with {@link DFSConfigKeys#DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY}. + * + * Most functions accept an optional {@link FsVolumeSpi} parameter for + * instrumentation/logging. + * + * Some methods may look redundant, especially the multiple variations of + * move/rename/list. They exist to retain behavior compatibility for existing + * code. + */ [email protected] [email protected] +public class FileIoProvider { + public static final Logger LOG = LoggerFactory.getLogger( + FileIoProvider.class); + + private final FileIoEvents eventHooks; + + /** + * @param conf Configuration object. May be null. When null, + * the event handlers are no-ops. + */ + public FileIoProvider(@Nullable Configuration conf) { + if (conf != null) { + final Class<? extends FileIoEvents> clazz = conf.getClass( + DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY, + DefaultFileIoEvents.class, + FileIoEvents.class); + eventHooks = ReflectionUtils.newInstance(clazz, conf); + } else { + eventHooks = new DefaultFileIoEvents(); + } + } + + /** + * Lists the types of file system operations. Passed to the + * IO hooks so implementations can choose behavior based on + * specific operations. + */ + public enum OPERATION { + OPEN, + EXISTS, + LIST, + DELETE, + MOVE, + MKDIRS, + TRANSFER, + SYNC, + FADVISE, + READ, + WRITE, + FLUSH, + NATIVE_COPY + } + + /** + * Retrieve statistics from the underlying {@link FileIoEvents} + * implementation as a JSON string, if it maintains them. + * @return statistics as a JSON string. May be null. + */ + public @Nullable String getStatistics() { + return eventHooks.getStatistics(); + } + + /** + * See {@link Flushable#flush()}. + * + * @param volume target volume. null if unavailable. + * @throws IOException + */ + public void flush( + @Nullable FsVolumeSpi volume, Flushable f) throws IOException { + final long begin = eventHooks.beforeFileIo(volume, FLUSH, 0); + try { + f.flush(); + eventHooks.afterFileIo(volume, FLUSH, begin, 0); + } catch (Exception e) { + eventHooks.onFailure(volume, FLUSH, e, begin); + throw e; + } + } + + /** + * Sync the given {@link FileOutputStream}. + * + * @param volume target volume. null if unavailable. + * @throws IOException + */ + public void sync( + @Nullable FsVolumeSpi volume, FileOutputStream fos) throws IOException { + final long begin = eventHooks.beforeFileIo(volume, SYNC, 0); + try { + fos.getChannel().force(true); + eventHooks.afterFileIo(volume, SYNC, begin, 0); + } catch (Exception e) { + eventHooks.onFailure(volume, SYNC, e, begin); + throw e; + } + } + + /** + * Call sync_file_range on the given file descriptor. + * + * @param volume target volume. null if unavailable. + * @throws IOException + */ + public void syncFileRange( + @Nullable FsVolumeSpi volume, FileDescriptor outFd, + long offset, long numBytes, int flags) throws NativeIOException { + final long begin = eventHooks.beforeFileIo(volume, SYNC, 0); + try { + NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, numBytes, flags); + eventHooks.afterFileIo(volume, SYNC, begin, 0); + } catch (Exception e) { + eventHooks.onFailure(volume, SYNC, e, begin); + throw e; + } + } + + /** + * Call posix_fadvise on the given file descriptor. + * + * @param volume target volume. null if unavailable. + * @throws IOException + */ + public void posixFadvise( + @Nullable FsVolumeSpi volume, String identifier, FileDescriptor outFd, + long offset, long length, int flags) throws NativeIOException { + final long begin = eventHooks.beforeMetadataOp(volume, FADVISE); + try { + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( + identifier, outFd, offset, length, flags); + eventHooks.afterMetadataOp(volume, FADVISE, begin); + } catch (Exception e) { + eventHooks.onFailure(volume, FADVISE, e, begin); + throw e; + } + } + + /** + * Delete a file. + * @param volume target volume. null if unavailable. + * @param f File to delete. + * @return true if the file was successfully deleted. + */ + public boolean delete(@Nullable FsVolumeSpi volume, File f) { + final long begin = eventHooks.beforeMetadataOp(volume, DELETE); + try { + boolean deleted = f.delete(); + eventHooks.afterMetadataOp(volume, DELETE, begin); + return deleted; + } catch (Exception e) { + eventHooks.onFailure(volume, DELETE, e, begin); + throw e; + } + } + + /** + * Delete a file, first checking to see if it exists. + * @param volume target volume. null if unavailable. + * @param f File to delete + * @return true if the file was successfully deleted or if it never + * existed. + */ + public boolean deleteWithExistsCheck(@Nullable FsVolumeSpi volume, File f) { + final long begin = eventHooks.beforeMetadataOp(volume, DELETE); + try { + boolean deleted = !f.exists() || f.delete(); + eventHooks.afterMetadataOp(volume, DELETE, begin); + if (!deleted) { + LOG.warn("Failed to delete file {}", f); + } + return deleted; + } catch (Exception e) { + eventHooks.onFailure(volume, DELETE, e, begin); + throw e; + } + } + + /** + * Transfer data from a FileChannel to a SocketOutputStream. + * + * @param volume target volume. null if unavailable. + * @param sockOut SocketOutputStream to write the data. + * @param fileCh FileChannel from which to read data. + * @param position position within the channel where the transfer begins. + * @param count number of bytes to transfer. + * @param waitTime returns the nanoseconds spent waiting for the socket + * to become writable. + * @param transferTime returns the nanoseconds spent transferring data. + * @throws IOException + */ + public void transferToSocketFully( + @Nullable FsVolumeSpi volume, SocketOutputStream sockOut, + FileChannel fileCh, long position, int count, + LongWritable waitTime, LongWritable transferTime) throws IOException { + final long begin = eventHooks.beforeFileIo(volume, TRANSFER, count); + try { + sockOut.transferToFully(fileCh, position, count, + waitTime, transferTime); + eventHooks.afterFileIo(volume, TRANSFER, begin, count); + } catch (Exception e) { + eventHooks.onFailure(volume, TRANSFER, e, begin); + throw e; + } + } + + /** + * Create a file. + * @param volume target volume. null if unavailable. + * @param f File to be created. + * @return true if the file does not exist and was successfully created. + * false if the file already exists. + * @throws IOException + */ + public boolean createFile( + @Nullable FsVolumeSpi volume, File f) throws IOException { + final long begin = eventHooks.beforeMetadataOp(volume, OPEN); + try { + boolean created = f.createNewFile(); + eventHooks.afterMetadataOp(volume, OPEN, begin); + return created; + } catch (Exception e) { + eventHooks.onFailure(volume, OPEN, e, begin); + throw e; + } + } + + /** + * Create a FileInputStream using + * {@link FileInputStream#FileInputStream(File)}. + * + * Wraps the created input stream to intercept read calls + * before delegating to the wrapped stream. + * + * @param volume target volume. null if unavailable. + * @param f File object. + * @return FileInputStream to the given file. + * @throws FileNotFoundException + */ + public FileInputStream getFileInputStream( + @Nullable FsVolumeSpi volume, File f) throws FileNotFoundException { + final long begin = eventHooks.beforeMetadataOp(volume, OPEN); + FileInputStream fis = null; + try { + fis = new WrappedFileInputStream(volume, f); + eventHooks.afterMetadataOp(volume, OPEN, begin); + return fis; + } catch(Exception e) { + org.apache.commons.io.IOUtils.closeQuietly(fis); + eventHooks.onFailure(volume, OPEN, e, begin); + throw e; + } + } + + /** + * Create a FileOutputStream using + * {@link FileOutputStream#FileOutputStream(File, boolean)}. + * + * Wraps the created output stream to intercept write calls + * before delegating to the wrapped stream. + * + * @param volume target volume. null if unavailable. + * @param f File object. + * @param append if true, then bytes will be written to the end of the + * file rather than the beginning. + * @param FileOutputStream to the given file object. + * @throws FileNotFoundException + */ + public FileOutputStream getFileOutputStream( + @Nullable FsVolumeSpi volume, File f, + boolean append) throws FileNotFoundException { + final long begin = eventHooks.beforeMetadataOp(volume, OPEN); + FileOutputStream fos = null; + try { + fos = new WrappedFileOutputStream(volume, f, append); + eventHooks.afterMetadataOp(volume, OPEN, begin); + return fos; + } catch(Exception e) { + org.apache.commons.io.IOUtils.closeQuietly(fos); + eventHooks.onFailure(volume, OPEN, e, begin); + throw e; + } + } + + /** + * Create a FileOutputStream using + * {@link FileOutputStream#FileOutputStream(File, boolean)}. + * + * Wraps the created output stream to intercept write calls + * before delegating to the wrapped stream. + * + * @param volume target volume. null if unavailable. + * @param f File object. + * @return FileOutputStream to the given file object. + * @throws FileNotFoundException + */ + public FileOutputStream getFileOutputStream( + @Nullable FsVolumeSpi volume, File f) throws FileNotFoundException { + return getFileOutputStream(volume, f, false); + } + + /** + * Create a FileOutputStream using + * {@link FileOutputStream#FileOutputStream(FileDescriptor)}. + * + * Wraps the created output stream to intercept write calls + * before delegating to the wrapped stream. + * + * @param volume target volume. null if unavailable. + * @param f File object. + * @return FileOutputStream to the given file object. + * @throws FileNotFoundException + */ + public FileOutputStream getFileOutputStream( + @Nullable FsVolumeSpi volume, FileDescriptor fd) { + return new WrappedFileOutputStream(volume, fd); + } + + /** + * Create a FileInputStream using + * {@link NativeIO#getShareDeleteFileDescriptor}. + * Wraps the created input stream to intercept input calls + * before delegating to the wrapped stream. + * + * @param volume target volume. null if unavailable. + * @param f File object. + * @param offset the offset position, measured in bytes from the + * beginning of the file, at which to set the file + * pointer. + * @return FileOutputStream to the given file object. + * @throws FileNotFoundException + */ + public FileInputStream getShareDeleteFileInputStream( + @Nullable FsVolumeSpi volume, File f, + long offset) throws IOException { + final long begin = eventHooks.beforeMetadataOp(volume, OPEN); + FileInputStream fis = null; + try { + fis = new WrappedFileInputStream(volume, + NativeIO.getShareDeleteFileDescriptor(f, offset)); + eventHooks.afterMetadataOp(volume, OPEN, begin); + return fis; + } catch(Exception e) { + org.apache.commons.io.IOUtils.closeQuietly(fis); + eventHooks.onFailure(volume, OPEN, e, begin); + throw e; + } + } + + /** + * Create a FileInputStream using + * {@link FileInputStream#FileInputStream(File)} and position + * it at the given offset. + * + * Wraps the created input stream to intercept read calls + * before delegating to the wrapped stream. + * + * @param volume target volume. null if unavailable. + * @param f File object. + * @param offset the offset position, measured in bytes from the + * beginning of the file, at which to set the file + * pointer. + * @throws FileNotFoundException + */ + public FileInputStream openAndSeek( + @Nullable FsVolumeSpi volume, File f, long offset) throws IOException { + final long begin = eventHooks.beforeMetadataOp(volume, OPEN); + FileInputStream fis = null; + try { + fis = new WrappedFileInputStream(volume, + FsDatasetUtil.openAndSeek(f, offset)); + eventHooks.afterMetadataOp(volume, OPEN, begin); + return fis; + } catch(Exception e) { + org.apache.commons.io.IOUtils.closeQuietly(fis); + eventHooks.onFailure(volume, OPEN, e, begin); + throw e; + } + } + + /** + * Create a RandomAccessFile using + * {@link RandomAccessFile#RandomAccessFile(File, String)}. + * + * Wraps the created input stream to intercept IO calls + * before delegating to the wrapped RandomAccessFile. + * + * @param volume target volume. null if unavailable. + * @param f File object. + * @param mode See {@link RandomAccessFile} for a description + * of the mode string. + * @return RandomAccessFile representing the given file. + * @throws FileNotFoundException + */ + public RandomAccessFile getRandomAccessFile( + @Nullable FsVolumeSpi volume, File f, + String mode) throws FileNotFoundException { + final long begin = eventHooks.beforeMetadataOp(volume, OPEN); + RandomAccessFile raf = null; + try { + raf = new WrappedRandomAccessFile(volume, f, mode); + eventHooks.afterMetadataOp(volume, OPEN, begin); + return raf; + } catch(Exception e) { + org.apache.commons.io.IOUtils.closeQuietly(raf); + eventHooks.onFailure(volume, OPEN, e, begin); + throw e; + } + } + + /** + * Delete the given directory using {@link FileUtil#fullyDelete(File)}. + * + * @param volume target volume. null if unavailable. + * @param dir directory to be deleted. + * @return true on success false on failure. + */ + public boolean fullyDelete(@Nullable FsVolumeSpi volume, File dir) { + final long begin = eventHooks.beforeMetadataOp(volume, DELETE); + try { + boolean deleted = FileUtil.fullyDelete(dir); + eventHooks.afterMetadataOp(volume, DELETE, begin); + return deleted; + } catch(Exception e) { + eventHooks.onFailure(volume, DELETE, e, begin); + throw e; + } + } + + /** + * Move the src file to the target using + * {@link FileUtil#replaceFile(File, File)}. + * + * @param volume target volume. null if unavailable. + * @param src source path. + * @param target target path. + * @throws IOException + */ + public void replaceFile( + @Nullable FsVolumeSpi volume, File src, File target) throws IOException { + final long begin = eventHooks.beforeMetadataOp(volume, MOVE); + try { + FileUtil.replaceFile(src, target); + eventHooks.afterMetadataOp(volume, MOVE, begin); + } catch(Exception e) { + eventHooks.onFailure(volume, MOVE, e, begin); + throw e; + } + } + + /** + * Move the src file to the target using + * {@link Storage#rename(File, File)}. + * + * @param volume target volume. null if unavailable. + * @param src source path. + * @param target target path. + * @throws IOException + */ + public void rename( + @Nullable FsVolumeSpi volume, File src, File target) + throws IOException { + final long begin = eventHooks.beforeMetadataOp(volume, MOVE); + try { + Storage.rename(src, target); + eventHooks.afterMetadataOp(volume, MOVE, begin); + } catch(Exception e) { + eventHooks.onFailure(volume, MOVE, e, begin); + throw e; + } + } + + /** + * Move the src file to the target using + * {@link FileUtils#moveFile(File, File)}. + * + * @param volume target volume. null if unavailable. + * @param src source path. + * @param target target path. + * @throws IOException + */ + public void moveFile( + @Nullable FsVolumeSpi volume, File src, File target) + throws IOException { + final long begin = eventHooks.beforeMetadataOp(volume, MOVE); + try { + FileUtils.moveFile(src, target); + eventHooks.afterMetadataOp(volume, MOVE, begin); + } catch(Exception e) { + eventHooks.onFailure(volume, MOVE, e, begin); + throw e; + } + } + + /** + * Move the src file to the target using + * {@link Files#move(Path, Path, CopyOption...)}. + * + * @param volume target volume. null if unavailable. + * @param src source path. + * @param target target path. + * @param options See {@link Files#move} for a description + * of the options. + * @throws IOException + */ + public void move( + @Nullable FsVolumeSpi volume, Path src, Path target, + CopyOption... options) throws IOException { + final long begin = eventHooks.beforeMetadataOp(volume, MOVE); + try { + Files.move(src, target, options); + eventHooks.afterMetadataOp(volume, MOVE, begin); + } catch(Exception e) { + eventHooks.onFailure(volume, MOVE, e, begin); + throw e; + } + } + + /** + * See {@link Storage#nativeCopyFileUnbuffered(File, File, boolean)}. + * + * @param volume target volume. null if unavailable. + * @param src an existing file to copy, must not be {@code null} + * @param target the new file, must not be {@code null} + * @param preserveFileDate true if the file date of the copy + * should be the same as the original + * @throws IOException + */ + public void nativeCopyFileUnbuffered( + @Nullable FsVolumeSpi volume, File src, File target, + boolean preserveFileDate) throws IOException { + final long length = src.length(); + final long begin = eventHooks.beforeFileIo(volume, NATIVE_COPY, length); + try { + Storage.nativeCopyFileUnbuffered(src, target, preserveFileDate); + eventHooks.afterFileIo(volume, NATIVE_COPY, begin, length); + } catch(Exception e) { + eventHooks.onFailure(volume, NATIVE_COPY, e, begin); + throw e; + } + } + + /** + * See {@link File#mkdirs()}. + * + * @param volume target volume. null if unavailable. + * @param dir directory to be created. + * @return true only if the directory was created. false if + * the directory already exists. + * @throws IOException if a directory with the given name does + * not exist and could not be created. + */ + public boolean mkdirs( + @Nullable FsVolumeSpi volume, File dir) throws IOException { + final long begin = eventHooks.beforeMetadataOp(volume, MKDIRS); + boolean created = false; + boolean isDirectory; + try { + created = dir.mkdirs(); + isDirectory = !created && dir.isDirectory(); + eventHooks.afterMetadataOp(volume, MKDIRS, begin); + } catch(Exception e) { + eventHooks.onFailure(volume, MKDIRS, e, begin); + throw e; + } + + if (!created && !isDirectory) { + throw new IOException("Mkdirs failed to create " + dir); + } + return created; + } + + /** + * Create the target directory using {@link File#mkdirs()} only if + * it doesn't exist already. + * + * @param volume target volume. null if unavailable. + * @param dir directory to be created. + * @throws IOException if the directory could not created + */ + public void mkdirsWithExistsCheck( + @Nullable FsVolumeSpi volume, File dir) throws IOException { + final long begin = eventHooks.beforeMetadataOp(volume, MKDIRS); + boolean succeeded = false; + try { + succeeded = dir.isDirectory() || dir.mkdirs(); + eventHooks.afterMetadataOp(volume, MKDIRS, begin); + } catch(Exception e) { + eventHooks.onFailure(volume, MKDIRS, e, begin); + throw e; + } + + if (!succeeded) { + throw new IOException("Mkdirs failed to create " + dir); + } + } + + /** + * Get a listing of the given directory using + * {@link FileUtil#listFiles(File)}. + * + * @param volume target volume. null if unavailable. + * @param dir Directory to be listed. + * @return array of file objects representing the directory entries. + * @throws IOException + */ + public File[] listFiles( + @Nullable FsVolumeSpi volume, File dir) throws IOException { + final long begin = eventHooks.beforeMetadataOp(volume, LIST); + try { + File[] children = FileUtil.listFiles(dir); + eventHooks.afterMetadataOp(volume, LIST, begin); + return children; + } catch(Exception e) { + eventHooks.onFailure(volume, LIST, e, begin); + throw e; + } + } + + /** + * Get a listing of the given directory using + * {@link FileUtil#listFiles(File)}. + * + * @param volume target volume. null if unavailable. + * @param Driectory to be listed. + * @return array of strings representing the directory entries. + * @throws IOException + */ + public String[] list( + @Nullable FsVolumeSpi volume, File dir) throws IOException { + final long begin = eventHooks.beforeMetadataOp(volume, LIST); + try { + String[] children = FileUtil.list(dir); + eventHooks.afterMetadataOp(volume, LIST, begin); + return children; + } catch(Exception e) { + eventHooks.onFailure(volume, LIST, e, begin); + throw e; + } + } + + /** + * Get a listing of the given directory using + * {@link IOUtils#listDirectory(File, FilenameFilter)}. + * + * @param volume target volume. null if unavailable. + * @param dir Directory to list. + * @param filter {@link FilenameFilter} to filter the directory entries. + * @throws IOException + */ + public List<String> listDirectory( + @Nullable FsVolumeSpi volume, File dir, + FilenameFilter filter) throws IOException { + final long begin = eventHooks.beforeMetadataOp(volume, LIST); + try { + List<String> children = IOUtils.listDirectory(dir, filter); + eventHooks.afterMetadataOp(volume, LIST, begin); + return children; + } catch(Exception e) { + eventHooks.onFailure(volume, LIST, e, begin); + throw e; + } + } + + /** + * Retrieves the number of links to the specified file. + * + * @param volume target volume. null if unavailable. + * @param f file whose link count is being queried. + * @return number of hard-links to the given file, including the + * given path itself. + * @throws IOException + */ + public int getHardLinkCount( + @Nullable FsVolumeSpi volume, File f) throws IOException { + final long begin = eventHooks.beforeMetadataOp(volume, LIST); + try { + int count = HardLink.getLinkCount(f); + eventHooks.afterMetadataOp(volume, LIST, begin); + return count; + } catch(Exception e) { + eventHooks.onFailure(volume, LIST, e, begin); + throw e; + } + } + + /** + * Check for file existence using {@link File#exists()}. + * + * @param volume target volume. null if unavailable. + * @param f file object. + * @return true if the file exists. + */ + public boolean exists(@Nullable FsVolumeSpi volume, File f) { + final long begin = eventHooks.beforeMetadataOp(volume, EXISTS); + try { + boolean exists = f.exists(); + eventHooks.afterMetadataOp(volume, EXISTS, begin); + return exists; + } catch(Exception e) { + eventHooks.onFailure(volume, EXISTS, e, begin); + throw e; + } + } + + /** + * A thin wrapper over {@link FileInputStream} that allows + * instrumenting disk IO. + */ + private final class WrappedFileInputStream extends FileInputStream { + private @Nullable final FsVolumeSpi volume; + + /** + * {@inheritDoc}. + */ + private WrappedFileInputStream(@Nullable FsVolumeSpi volume, File f) + throws FileNotFoundException { + super(f); + this.volume = volume; + } + + /** + * {@inheritDoc}. + */ + private WrappedFileInputStream( + @Nullable FsVolumeSpi volume, FileDescriptor fd) { + super(fd); + this.volume = volume; + } + + /** + * {@inheritDoc}. + */ + @Override + public int read() throws IOException { + final long begin = eventHooks.beforeFileIo(volume, READ, 1); + try { + int b = super.read(); + eventHooks.afterFileIo(volume, READ, begin, 1); + return b; + } catch(Exception e) { + eventHooks.onFailure(volume, READ, e, begin); + throw e; + } + } + + /** + * {@inheritDoc}. + */ + @Override + public int read(@Nonnull byte[] b) throws IOException { + final long begin = eventHooks.beforeFileIo(volume, READ, b.length); + try { + int numBytesRead = super.read(b); + eventHooks.afterFileIo(volume, READ, begin, numBytesRead); + return numBytesRead; + } catch(Exception e) { + eventHooks.onFailure(volume, READ, e, begin); + throw e; + } + } + + /** + * {@inheritDoc}. + */ + @Override + public int read(@Nonnull byte[] b, int off, int len) throws IOException { + final long begin = eventHooks.beforeFileIo(volume, READ, len); + try { + int numBytesRead = super.read(b, off, len); + eventHooks.afterFileIo(volume, READ, begin, numBytesRead); + return numBytesRead; + } catch(Exception e) { + eventHooks.onFailure(volume, READ, e, begin); + throw e; + } + } + } + + /** + * A thin wrapper over {@link FileOutputStream} that allows + * instrumenting disk IO. + */ + private final class WrappedFileOutputStream extends FileOutputStream { + private @Nullable final FsVolumeSpi volume; + + /** + * {@inheritDoc}. + */ + private WrappedFileOutputStream( + @Nullable FsVolumeSpi volume, File f, + boolean append) throws FileNotFoundException { + super(f, append); + this.volume = volume; + } + + /** + * {@inheritDoc}. + */ + private WrappedFileOutputStream( + @Nullable FsVolumeSpi volume, FileDescriptor fd) { + super(fd); + this.volume = volume; + } + + /** + * {@inheritDoc}. + */ + @Override + public void write(int b) throws IOException { + final long begin = eventHooks.beforeFileIo(volume, WRITE, 1); + try { + super.write(b); + eventHooks.afterFileIo(volume, WRITE, begin, 1); + } catch(Exception e) { + eventHooks.onFailure(volume, WRITE, e, begin); + throw e; + } + } + + /** + * {@inheritDoc}. + */ + @Override + public void write(@Nonnull byte[] b) throws IOException { + final long begin = eventHooks.beforeFileIo(volume, WRITE, b.length); + try { + super.write(b); + eventHooks.afterFileIo(volume, WRITE, begin, b.length); + } catch(Exception e) { + eventHooks.onFailure(volume, WRITE, e, begin); + throw e; + } + } + + /** + * {@inheritDoc}. + */ + @Override + public void write(@Nonnull byte[] b, int off, int len) throws IOException { + final long begin = eventHooks.beforeFileIo(volume, WRITE, len); + try { + super.write(b, off, len); + eventHooks.afterFileIo(volume, WRITE, begin, len); + } catch(Exception e) { + eventHooks.onFailure(volume, WRITE, e, begin); + throw e; + } + } + } + + /** + * A thin wrapper over {@link FileInputStream} that allows + * instrumenting IO. + */ + private final class WrappedRandomAccessFile extends RandomAccessFile { + private @Nullable final FsVolumeSpi volume; + + public WrappedRandomAccessFile( + @Nullable FsVolumeSpi volume, File f, String mode) + throws FileNotFoundException { + super(f, mode); + this.volume = volume; + } + + @Override + public int read() throws IOException { + final long begin = eventHooks.beforeFileIo(volume, READ, 1); + try { + int b = super.read(); + eventHooks.afterFileIo(volume, READ, begin, 1); + return b; + } catch(Exception e) { + eventHooks.onFailure(volume, READ, e, begin); + throw e; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + final long begin = eventHooks.beforeFileIo(volume, READ, len); + try { + int numBytesRead = super.read(b, off, len); + eventHooks.afterFileIo(volume, READ, begin, numBytesRead); + return numBytesRead; + } catch(Exception e) { + eventHooks.onFailure(volume, READ, e, begin); + throw e; + } + } + + @Override + public int read(byte[] b) throws IOException { + final long begin = eventHooks.beforeFileIo(volume, READ, b.length); + try { + int numBytesRead = super.read(b); + eventHooks.afterFileIo(volume, READ, begin, numBytesRead); + return numBytesRead; + } catch(Exception e) { + eventHooks.onFailure(volume, READ, e, begin); + throw e; + } + } + + @Override + public void write(int b) throws IOException { + final long begin = eventHooks.beforeFileIo(volume, WRITE, 1); + try { + super.write(b); + eventHooks.afterFileIo(volume, WRITE, begin, 1); + } catch(Exception e) { + eventHooks.onFailure(volume, WRITE, e, begin); + throw e; + } + } + + @Override + public void write(@Nonnull byte[] b) throws IOException { + final long begin = eventHooks.beforeFileIo(volume, WRITE, b.length); + try { + super.write(b); + eventHooks.afterFileIo(volume, WRITE, begin, b.length); + } catch(Exception e) { + eventHooks.onFailure(volume, WRITE, e, begin); + throw e; + } + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + final long begin = eventHooks.beforeFileIo(volume, WRITE, len); + try { + super.write(b, off, len); + eventHooks.afterFileIo(volume, WRITE, begin, len); + } catch(Exception e) { + eventHooks.onFailure(volume, WRITE, e, begin); + throw e; + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java index e6f7e12..1d46ddd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java @@ -29,17 +29,13 @@ import java.net.URI; import java.util.HashMap; import java.util.Map; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.DataChecksum; @@ -187,20 +183,23 @@ abstract public class LocalReplica extends ReplicaInfo { * be recovered (especially on Windows) on datanode restart. */ private void breakHardlinks(File file, Block b) throws IOException { - File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file)); - try (FileInputStream in = new FileInputStream(file)) { - try (FileOutputStream out = new FileOutputStream(tmpFile)){ - copyBytes(in, out, 16 * 1024); + final FileIoProvider fileIoProvider = getFileIoProvider(); + final File tmpFile = DatanodeUtil.createFileWithExistsCheck( + getVolume(), b, DatanodeUtil.getUnlinkTmpFile(file), fileIoProvider); + try (FileInputStream in = fileIoProvider.getFileInputStream( + getVolume(), file)) { + try (FileOutputStream out = fileIoProvider.getFileOutputStream( + getVolume(), tmpFile)) { + IOUtils.copyBytes(in, out, 16 * 1024); } if (file.length() != tmpFile.length()) { throw new IOException("Copy of file " + file + " size " + file.length()+ " into file " + tmpFile + " resulted in a size of " + tmpFile.length()); } - replaceFile(tmpFile, file); + fileIoProvider.replaceFile(getVolume(), tmpFile, file); } catch (IOException e) { - boolean done = tmpFile.delete(); - if (!done) { + if (!fileIoProvider.delete(getVolume(), tmpFile)) { DataNode.LOG.info("detachFile failed to delete temporary file " + tmpFile); } @@ -226,19 +225,20 @@ abstract public class LocalReplica extends ReplicaInfo { * @throws IOException */ public boolean breakHardLinksIfNeeded() throws IOException { - File file = getBlockFile(); + final File file = getBlockFile(); + final FileIoProvider fileIoProvider = getFileIoProvider(); if (file == null || getVolume() == null) { throw new IOException("detachBlock:Block not found. " + this); } File meta = getMetaFile(); - int linkCount = getHardLinkCount(file); + int linkCount = fileIoProvider.getHardLinkCount(getVolume(), file); if (linkCount > 1) { DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " + "block " + this); breakHardlinks(file, this); } - if (getHardLinkCount(meta) > 1) { + if (fileIoProvider.getHardLinkCount(getVolume(), meta) > 1) { breakHardlinks(meta, this); } return true; @@ -256,17 +256,18 @@ abstract public class LocalReplica extends ReplicaInfo { @Override public OutputStream getDataOutputStream(boolean append) throws IOException { - return new FileOutputStream(getBlockFile(), append); + return getFileIoProvider().getFileOutputStream( + getVolume(), getBlockFile(), append); } @Override public boolean blockDataExists() { - return getBlockFile().exists(); + return getFileIoProvider().exists(getVolume(), getBlockFile()); } @Override public boolean deleteBlockData() { - return fullyDelete(getBlockFile()); + return getFileIoProvider().fullyDelete(getVolume(), getBlockFile()); } @Override @@ -282,9 +283,10 @@ abstract public class LocalReplica extends ReplicaInfo { @Override public LengthInputStream getMetadataInputStream(long offset) throws IOException { - File meta = getMetaFile(); + final File meta = getMetaFile(); return new LengthInputStream( - FsDatasetUtil.openAndSeek(meta, offset), meta.length()); + getFileIoProvider().openAndSeek(getVolume(), meta, offset), + meta.length()); } @Override @@ -295,12 +297,12 @@ abstract public class LocalReplica extends ReplicaInfo { @Override public boolean metadataExists() { - return getMetaFile().exists(); + return getFileIoProvider().exists(getVolume(), getMetaFile()); } @Override public boolean deleteMetadata() { - return fullyDelete(getMetaFile()); + return getFileIoProvider().fullyDelete(getVolume(), getMetaFile()); } @Override @@ -320,7 +322,7 @@ abstract public class LocalReplica extends ReplicaInfo { private boolean renameFile(File srcfile, File destfile) throws IOException { try { - rename(srcfile, destfile); + getFileIoProvider().rename(getVolume(), srcfile, destfile); return true; } catch (IOException e) { throw new IOException("Failed to move block file for " + this @@ -360,9 +362,9 @@ abstract public class LocalReplica extends ReplicaInfo { @Override public void bumpReplicaGS(long newGS) throws IOException { long oldGS = getGenerationStamp(); - File oldmeta = getMetaFile(); + final File oldmeta = getMetaFile(); setGenerationStamp(newGS); - File newmeta = getMetaFile(); + final File newmeta = getMetaFile(); // rename meta file to new GS if (LOG.isDebugEnabled()) { @@ -370,7 +372,7 @@ abstract public class LocalReplica extends ReplicaInfo { } try { // calling renameMeta on the ReplicaInfo doesn't work here - rename(oldmeta, newmeta); + getFileIoProvider().rename(getVolume(), oldmeta, newmeta); } catch (IOException e) { setGenerationStamp(oldGS); // restore old GS throw new IOException("Block " + this + " reopen failed. " + @@ -381,7 +383,8 @@ abstract public class LocalReplica extends ReplicaInfo { @Override public void truncateBlock(long newLength) throws IOException { - truncateBlock(getBlockFile(), getMetaFile(), getNumBytes(), newLength); + truncateBlock(getVolume(), getBlockFile(), getMetaFile(), + getNumBytes(), newLength, getFileIoProvider()); } @Override @@ -392,32 +395,15 @@ abstract public class LocalReplica extends ReplicaInfo { @Override public void copyMetadata(URI destination) throws IOException { //for local replicas, we assume the destination URI is file - nativeCopyFileUnbuffered(getMetaFile(), new File(destination), true); + getFileIoProvider().nativeCopyFileUnbuffered( + getVolume(), getMetaFile(), new File(destination), true); } @Override public void copyBlockdata(URI destination) throws IOException { //for local replicas, we assume the destination URI is file - nativeCopyFileUnbuffered(getBlockFile(), new File(destination), true); - } - - public void renameMeta(File newMetaFile) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming " + getMetaFile() + " to " + newMetaFile); - } - renameFile(getMetaFile(), newMetaFile); - } - - public void renameBlock(File newBlockFile) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming " + getBlockFile() + " to " + newBlockFile - + ", file length=" + getBlockFile().length()); - } - renameFile(getBlockFile(), newBlockFile); - } - - public static void rename(File from, File to) throws IOException { - Storage.rename(from, to); + getFileIoProvider().nativeCopyFileUnbuffered( + getVolume(), getBlockFile(), new File(destination), true); } /** @@ -430,11 +416,13 @@ abstract public class LocalReplica extends ReplicaInfo { private FileInputStream getDataInputStream(File f, long seekOffset) throws IOException { FileInputStream fis; + final FileIoProvider fileIoProvider = getFileIoProvider(); if (NativeIO.isAvailable()) { - fis = NativeIO.getShareDeleteFileInputStream(f, seekOffset); + fis = fileIoProvider.getShareDeleteFileInputStream( + getVolume(), f, seekOffset); } else { try { - fis = FsDatasetUtil.openAndSeek(f, seekOffset); + fis = fileIoProvider.openAndSeek(getVolume(), f, seekOffset); } catch (FileNotFoundException fnfe) { throw new IOException("Expected block file at " + f + " does not exist."); @@ -443,30 +431,6 @@ abstract public class LocalReplica extends ReplicaInfo { return fis; } - private void nativeCopyFileUnbuffered(File srcFile, File destFile, - boolean preserveFileDate) throws IOException { - Storage.nativeCopyFileUnbuffered(srcFile, destFile, preserveFileDate); - } - - private void copyBytes(InputStream in, OutputStream out, int - buffSize) throws IOException{ - IOUtils.copyBytes(in, out, buffSize); - } - - private void replaceFile(File src, File target) throws IOException { - FileUtil.replaceFile(src, target); - } - - public static boolean fullyDelete(final File dir) { - boolean result = DataStorage.fullyDelete(dir); - return result; - } - - public static int getHardLinkCount(File fileName) throws IOException { - int linkCount = HardLink.getLinkCount(fileName); - return linkCount; - } - /** * Get pin status of a file by checking the sticky bit. * @param localFS local file system @@ -495,8 +459,10 @@ abstract public class LocalReplica extends ReplicaInfo { localFS.setPermission(path, permission); } - public static void truncateBlock(File blockFile, File metaFile, - long oldlen, long newlen) throws IOException { + public static void truncateBlock( + FsVolumeSpi volume, File blockFile, File metaFile, + long oldlen, long newlen, FileIoProvider fileIoProvider) + throws IOException { LOG.info("truncateBlock: blockFile=" + blockFile + ", metaFile=" + metaFile + ", oldlen=" + oldlen @@ -510,7 +476,10 @@ abstract public class LocalReplica extends ReplicaInfo { + ") to newlen (=" + newlen + ")"); } - DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); + // fis is closed by BlockMetadataHeader.readHeader. + final FileInputStream fis = fileIoProvider.getFileInputStream( + volume, metaFile); + DataChecksum dcs = BlockMetadataHeader.readHeader(fis).getChecksum(); int checksumsize = dcs.getChecksumSize(); int bpc = dcs.getBytesPerChecksum(); long n = (newlen - 1)/bpc + 1; @@ -519,16 +488,14 @@ abstract public class LocalReplica extends ReplicaInfo { int lastchunksize = (int)(newlen - lastchunkoffset); byte[] b = new byte[Math.max(lastchunksize, checksumsize)]; - RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw"); - try { + try (RandomAccessFile blockRAF = fileIoProvider.getRandomAccessFile( + volume, blockFile, "rw")) { //truncate blockFile blockRAF.setLength(newlen); //read last chunk blockRAF.seek(lastchunkoffset); blockRAF.readFully(b, 0, lastchunksize); - } finally { - blockRAF.close(); } //compute checksum @@ -536,13 +503,11 @@ abstract public class LocalReplica extends ReplicaInfo { dcs.writeValue(b, 0, false); //update metaFile - RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); - try { + try (RandomAccessFile metaRAF = fileIoProvider.getRandomAccessFile( + volume, metaFile, "rw")) { metaRAF.setLength(newmetalen); metaRAF.seek(newmetalen - checksumsize); metaRAF.write(b, 0, checksumsize); - } finally { - metaRAF.close(); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java index 1387155..003f96f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java @@ -245,10 +245,9 @@ public class LocalReplicaInPipeline extends LocalReplica @Override // ReplicaInPipeline public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum, long slowLogThresholdMs) - throws IOException { - File blockFile = getBlockFile(); - File metaFile = getMetaFile(); + DataChecksum requestedChecksum) throws IOException { + final File blockFile = getBlockFile(); + final File metaFile = getMetaFile(); if (DataNode.LOG.isDebugEnabled()) { DataNode.LOG.debug("writeTo blockfile is " + blockFile + " of size " + blockFile.length()); @@ -262,14 +261,16 @@ public class LocalReplicaInPipeline extends LocalReplica // may differ from requestedChecksum for appends. final DataChecksum checksum; - RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); + final RandomAccessFile metaRAF = + getFileIoProvider().getRandomAccessFile(getVolume(), metaFile, "rw"); if (!isCreate) { // For append or recovery, we must enforce the existing checksum. // Also, verify that the file has correct lengths, etc. boolean checkedMeta = false; try { - BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF); + BlockMetadataHeader header = + BlockMetadataHeader.readHeader(metaRAF); checksum = header.getChecksum(); if (checksum.getBytesPerChecksum() != @@ -302,20 +303,24 @@ public class LocalReplicaInPipeline extends LocalReplica checksum = requestedChecksum; } + final FileIoProvider fileIoProvider = getFileIoProvider(); FileOutputStream blockOut = null; FileOutputStream crcOut = null; try { - blockOut = new FileOutputStream( - new RandomAccessFile(blockFile, "rw").getFD()); - crcOut = new FileOutputStream(metaRAF.getFD()); + blockOut = fileIoProvider.getFileOutputStream( + getVolume(), + fileIoProvider.getRandomAccessFile(getVolume(), blockFile, "rw") + .getFD()); + crcOut = fileIoProvider.getFileOutputStream(getVolume(), metaRAF.getFD()); if (!isCreate) { blockOut.getChannel().position(blockDiskSize); crcOut.getChannel().position(crcDiskSize); } return new ReplicaOutputStreams(blockOut, crcOut, checksum, - getVolume().isTransientStorage(), slowLogThresholdMs); + getVolume(), fileIoProvider); } catch (IOException e) { IOUtils.closeStream(blockOut); + IOUtils.closeStream(crcOut); IOUtils.closeStream(metaRAF); throw e; } @@ -326,11 +331,11 @@ public class LocalReplicaInPipeline extends LocalReplica File blockFile = getBlockFile(); File restartMeta = new File(blockFile.getParent() + File.pathSeparator + "." + blockFile.getName() + ".restart"); - if (restartMeta.exists() && !restartMeta.delete()) { + if (!getFileIoProvider().deleteWithExistsCheck(getVolume(), restartMeta)) { DataNode.LOG.warn("Failed to delete restart meta file: " + restartMeta.getPath()); } - return new FileOutputStream(restartMeta); + return getFileIoProvider().getFileOutputStream(getVolume(), restartMeta); } @Override @@ -373,12 +378,14 @@ public class LocalReplicaInPipeline extends LocalReplica + " should be derived from LocalReplica"); } - LocalReplica oldReplica = (LocalReplica) oldReplicaInfo; - File oldmeta = oldReplica.getMetaFile(); - File newmeta = getMetaFile(); + final LocalReplica oldReplica = (LocalReplica) oldReplicaInfo; + final File oldBlockFile = oldReplica.getBlockFile(); + final File oldmeta = oldReplica.getMetaFile(); + final File newmeta = getMetaFile(); + final FileIoProvider fileIoProvider = getFileIoProvider(); try { - oldReplica.renameMeta(newmeta); + fileIoProvider.rename(getVolume(), oldmeta, newmeta); } catch (IOException e) { throw new IOException("Block " + oldReplicaInfo + " reopen failed. " + " Unable to move meta file " + oldmeta + @@ -386,10 +393,10 @@ public class LocalReplicaInPipeline extends LocalReplica } try { - oldReplica.renameBlock(newBlkFile); + fileIoProvider.rename(getVolume(), oldBlockFile, newBlkFile); } catch (IOException e) { try { - renameMeta(oldmeta); + fileIoProvider.rename(getVolume(), newmeta, oldmeta); } catch (IOException ex) { LOG.warn("Cannot move meta file " + newmeta + "back to the finalized directory " + oldmeta, ex); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index 5fdbec0..efa6ea6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -69,13 +69,11 @@ public interface ReplicaInPipeline extends Replica { * * @param isCreate if it is for creation * @param requestedChecksum the checksum the writer would prefer to use - * @param slowLogThresholdMs slow io threshold for logging * @return output streams for writing * @throws IOException if any error occurs */ public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum, long slowLogThresholdMs) - throws IOException; + DataChecksum requestedChecksum) throws IOException; /** * Create an output stream to write restart metadata in case of datanode --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
