Repository: hadoop Updated Branches: refs/heads/branch-2 1dc548305 -> 5ec2b6caa
HDFS-7496. Fix FsVolume removal race conditions on the DataNode by reference-counting the volume instances (Lei (Eddy) Xu via Colin P. McCabe) (cherry-picked from commit a17584936cc5141e3f5612ac3ecf35e27968e439) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5ec2b6ca Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5ec2b6ca Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5ec2b6ca Branch: refs/heads/branch-2 Commit: 5ec2b6caa9d63123a88f407f734319d4ac6038a9 Parents: 1dc5483 Author: Colin Patrick Mccabe <[email protected]> Authored: Wed Jan 21 11:38:08 2015 -0800 Committer: Colin Patrick Mccabe <[email protected]> Committed: Wed Jan 21 11:43:42 2015 -0800 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/server/datanode/BlockReceiver.java | 59 ++--- .../hdfs/server/datanode/BlockSender.java | 10 + .../hdfs/server/datanode/ReplicaHandler.java | 49 ++++ .../server/datanode/fsdataset/FsDatasetSpi.java | 13 +- .../datanode/fsdataset/FsVolumeReference.java | 48 ++++ .../server/datanode/fsdataset/FsVolumeSpi.java | 10 + .../datanode/fsdataset/ReplicaInputStreams.java | 6 +- .../impl/FsDatasetAsyncDiskService.java | 16 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 265 ++++++++++++------- .../datanode/fsdataset/impl/FsVolumeImpl.java | 121 ++++++++- .../datanode/fsdataset/impl/FsVolumeList.java | 76 +++++- .../impl/RamDiskAsyncLazyPersistService.java | 16 +- .../src/main/proto/datatransfer.proto | 2 +- .../hdfs/TestWriteBlockGetsBlockLengthHint.java | 2 +- .../server/datanode/SimulatedFSDataset.java | 35 ++- .../hdfs/server/datanode/TestBlockRecovery.java | 2 +- .../datanode/TestDataNodeHotSwapVolumes.java | 33 ++- .../server/datanode/TestDirectoryScanner.java | 9 +- .../server/datanode/TestSimulatedFSDataset.java | 2 +- .../fsdataset/impl/TestFsDatasetImpl.java | 7 +- .../fsdataset/impl/TestWriteToReplica.java | 12 +- 22 files changed, 596 insertions(+), 200 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 5fda4d6..c4f5236 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -466,6 +466,9 @@ Release 2.7.0 - UNRELEASED HDFS-7641. Update archival storage user doc for list/set/get block storage policies. (yliu) + HDFS-7496. Fix FsVolume removal race conditions on the DataNode by + reference-counting the volume instances (lei via cmccabe) + HDFS-7610. Fix removal of dynamically added DN volumes (Lei (Eddy) Xu via Colin P. McCabe) http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 08c96be..6e37c23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -26,7 +26,6 @@ import java.io.DataOutputStream; import java.io.File; import java.io.FileDescriptor; import java.io.FileOutputStream; -import java.io.FileWriter; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; @@ -49,10 +48,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; -import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; @@ -125,6 +122,8 @@ class BlockReceiver implements Closeable { private boolean syncOnClose; private long restartBudget; + /** the reference of the volume where the block receiver writes to */ + private ReplicaHandler replicaHandler; /** * for replaceBlock response @@ -179,48 +178,50 @@ class BlockReceiver implements Closeable { // Open local disk out // if (isDatanode) { //replication or move - replicaInfo = datanode.data.createTemporary(storageType, block); + replicaHandler = datanode.data.createTemporary(storageType, block); } else { switch (stage) { case PIPELINE_SETUP_CREATE: - replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist); + replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist); datanode.notifyNamenodeReceivingBlock( - block, replicaInfo.getStorageUuid()); + block, replicaHandler.getReplica().getStorageUuid()); break; case PIPELINE_SETUP_STREAMING_RECOVERY: - replicaInfo = datanode.data.recoverRbw( + replicaHandler = datanode.data.recoverRbw( block, newGs, minBytesRcvd, maxBytesRcvd); block.setGenerationStamp(newGs); break; case PIPELINE_SETUP_APPEND: - replicaInfo = datanode.data.append(block, newGs, minBytesRcvd); + replicaHandler = datanode.data.append(block, newGs, minBytesRcvd); if (datanode.blockScanner != null) { // remove from block scanner datanode.blockScanner.deleteBlock(block.getBlockPoolId(), block.getLocalBlock()); } block.setGenerationStamp(newGs); datanode.notifyNamenodeReceivingBlock( - block, replicaInfo.getStorageUuid()); + block, replicaHandler.getReplica().getStorageUuid()); break; case PIPELINE_SETUP_APPEND_RECOVERY: - replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd); + replicaHandler = datanode.data.recoverAppend(block, newGs, minBytesRcvd); if (datanode.blockScanner != null) { // remove from block scanner datanode.blockScanner.deleteBlock(block.getBlockPoolId(), block.getLocalBlock()); } block.setGenerationStamp(newGs); datanode.notifyNamenodeReceivingBlock( - block, replicaInfo.getStorageUuid()); + block, replicaHandler.getReplica().getStorageUuid()); break; case TRANSFER_RBW: case TRANSFER_FINALIZED: // this is a transfer destination - replicaInfo = datanode.data.createTemporary(storageType, block); + replicaHandler = + datanode.data.createTemporary(storageType, block); break; default: throw new IOException("Unsupported stage " + stage + " while receiving block " + block + " from " + inAddr); } } + replicaInfo = replicaHandler.getReplica(); this.dropCacheBehindWrites = (cachingStrategy.getDropBehind() == null) ? datanode.getDnConf().dropCacheBehindWrites : cachingStrategy.getDropBehind(); @@ -339,6 +340,10 @@ class BlockReceiver implements Closeable { finally{ IOUtils.closeStream(out); } + if (replicaHandler != null) { + IOUtils.cleanup(null, replicaHandler); + replicaHandler = null; + } if (measuredFlushTime) { datanode.metrics.addFlushNanos(flushTotalNanos); } @@ -950,15 +955,12 @@ class BlockReceiver implements Closeable { // byte[] buf = new byte[sizePartialChunk]; byte[] crcbuf = new byte[checksumSize]; - ReplicaInputStreams instr = null; - try { - instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff); + try (ReplicaInputStreams instr = + datanode.data.getTmpInputStreams(block, blkoff, ckoff)) { IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk); // open meta file and read in crc value computer earlier IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length); - } finally { - IOUtils.closeStream(instr); } // compute crc of partial chunk from data read in the block file. @@ -1244,28 +1246,7 @@ class BlockReceiver implements Closeable { if (lastPacketInBlock) { // Finalize the block and close the block file - try { - finalizeBlock(startTime); - } catch (ReplicaNotFoundException e) { - // Verify that the exception is due to volume removal. - FsVolumeSpi volume; - synchronized (datanode.data) { - volume = datanode.data.getVolume(block); - } - if (volume == null) { - // ReplicaInfo has been removed due to the corresponding data - // volume has been removed. Don't need to check disk error. - LOG.info(myString - + ": BlockReceiver is interrupted because the block pool " - + block.getBlockPoolId() + " has been removed.", e); - sendAckUpstream(ack, expected, totalAckTimeNanos, 0, - Status.OOB_INTERRUPTED); - running = false; - receiverThread.interrupt(); - continue; - } - throw e; - } + finalizeBlock(startTime); } sendAckUpstream(ack, expected, totalAckTimeNanos, http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 27d3e5c..182b366 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; @@ -143,6 +144,8 @@ class BlockSender implements java.io.Closeable { /** The file descriptor of the block being sent */ private FileDescriptor blockInFd; + /** The reference to the volume where the block is located */ + private FsVolumeReference volumeRef; // Cache-management related fields private final long readaheadLength; @@ -257,6 +260,9 @@ class BlockSender implements java.io.Closeable { this.transferToAllowed = datanode.getDnConf().transferToAllowed && (!is32Bit || length <= Integer.MAX_VALUE); + // Obtain a reference before reading data + this.volumeRef = datanode.data.getVolume(block).obtainReference(); + /* * (corruptChecksumOK, meta_file_exist): operation * True, True: will verify checksum @@ -420,6 +426,10 @@ class BlockSender implements java.io.Closeable { blockIn = null; blockInFd = null; } + if (volumeRef != null) { + IOUtils.cleanup(null, volumeRef); + volumeRef = null; + } // throw IOException if there is any if(ioe!= null) { throw ioe; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java new file mode 100644 index 0000000..b563d7f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; + +import java.io.Closeable; +import java.io.IOException; + +/** + * This class includes a replica being actively written and the reference to + * the fs volume where this replica is located. + */ +public class ReplicaHandler implements Closeable { + private final ReplicaInPipelineInterface replica; + private final FsVolumeReference volumeReference; + + public ReplicaHandler( + ReplicaInPipelineInterface replica, FsVolumeReference reference) { + this.replica = replica; + this.volumeReference = reference; + } + + @Override + public void close() throws IOException { + if (this.volumeReference != null) { + volumeReference.close(); + } + } + + public ReplicaInPipelineInterface getReplica() { + return replica; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 462ad31..d218146 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; +import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; @@ -198,7 +199,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - public ReplicaInPipelineInterface createTemporary(StorageType storageType, + public ReplicaHandler createTemporary(StorageType storageType, ExtendedBlock b) throws IOException; /** @@ -208,7 +209,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - public ReplicaInPipelineInterface createRbw(StorageType storageType, + public ReplicaHandler createRbw(StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException; /** @@ -221,7 +222,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - public ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, + public ReplicaHandler recoverRbw(ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException; /** @@ -241,7 +242,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { * @return the meata info of the replica which is being written to * @throws IOException */ - public ReplicaInPipelineInterface append(ExtendedBlock b, long newGS, + public ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException; /** @@ -254,8 +255,8 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { * @return the meta info of the replica which is being written to * @throws IOException */ - public ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, long newGS, - long expectedBlockLen) throws IOException; + public ReplicaHandler recoverAppend( + ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException; /** * Recover a failed pipeline close http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeReference.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeReference.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeReference.java new file mode 100644 index 0000000..e61a059 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeReference.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset; + +import java.io.Closeable; +import java.io.IOException; + +/** + * This is the interface for holding reference count as AutoClosable resource. + * It increases the reference count by one in the constructor, and decreases + * the reference count by one in {@link #close()}. + * + * <pre> + * {@code + * try (FsVolumeReference ref = volume.obtainReference()) { + * // Do IOs on the volume + * volume.createRwb(...); + * ... + * } + * } + * </pre> + */ +public interface FsVolumeReference extends Closeable { + /** + * Descrese the reference count of the volume. + * @throws IOException it never throws IOException. + */ + @Override + public void close() throws IOException; + + /** Returns the underlying volume object */ + public FsVolumeSpi getVolume(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index 4f45922..d9c37cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.File; import java.io.IOException; +import java.nio.channels.ClosedChannelException; import org.apache.hadoop.hdfs.StorageType; @@ -26,6 +27,15 @@ import org.apache.hadoop.hdfs.StorageType; * This is an interface for the underlying volume. */ public interface FsVolumeSpi { + /** + * Obtain a reference object that had increased 1 reference count of the + * volume. + * + * It is caller's responsibility to close {@link FsVolumeReference} to decrease + * the reference count on the volume. + */ + FsVolumeReference obtainReference() throws ClosedChannelException; + /** @return the StorageUuid of the volume */ public String getStorageID(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java index 6bd7199..a8bf622 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java @@ -30,9 +30,12 @@ import org.apache.hadoop.io.IOUtils; public class ReplicaInputStreams implements Closeable { private final InputStream dataIn; private final InputStream checksumIn; + private final FsVolumeReference volumeRef; /** Create an object with a data input stream and a checksum input stream. */ - public ReplicaInputStreams(FileDescriptor dataFd, FileDescriptor checksumFd) { + public ReplicaInputStreams(FileDescriptor dataFd, FileDescriptor checksumFd, + FsVolumeReference volumeRef) { + this.volumeRef = volumeRef; this.dataIn = new FileInputStream(dataFd); this.checksumIn = new FileInputStream(checksumFd); } @@ -51,5 +54,6 @@ public class ReplicaInputStreams implements Closeable { public void close() { IOUtils.closeStream(dataIn); IOUtils.closeStream(checksumIn); + IOUtils.cleanup(null, volumeRef); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index bee7bf7..13e854f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; import java.io.FileDescriptor; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; @@ -31,7 +32,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIOException; @@ -200,13 +203,13 @@ class FsDatasetAsyncDiskService { * Delete the block file and meta file from the disk asynchronously, adjust * dfsUsed statistics accordingly. */ - void deleteAsync(FsVolumeImpl volume, File blockFile, File metaFile, + void deleteAsync(FsVolumeReference volumeRef, File blockFile, File metaFile, ExtendedBlock block, String trashDirectory) { LOG.info("Scheduling " + block.getLocalBlock() + " file " + blockFile + " for deletion"); ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask( - volume, blockFile, metaFile, block, trashDirectory); - execute(volume.getCurrentDir(), deletionTask); + volumeRef, blockFile, metaFile, block, trashDirectory); + execute(((FsVolumeImpl) volumeRef.getVolume()).getCurrentDir(), deletionTask); } /** A task for deleting a block file and its associated meta file, as well @@ -216,15 +219,17 @@ class FsDatasetAsyncDiskService { * files are deleted immediately. */ class ReplicaFileDeleteTask implements Runnable { + final FsVolumeReference volumeRef; final FsVolumeImpl volume; final File blockFile; final File metaFile; final ExtendedBlock block; final String trashDirectory; - ReplicaFileDeleteTask(FsVolumeImpl volume, File blockFile, + ReplicaFileDeleteTask(FsVolumeReference volumeRef, File blockFile, File metaFile, ExtendedBlock block, String trashDirectory) { - this.volume = volume; + this.volumeRef = volumeRef; + this.volume = (FsVolumeImpl) volumeRef.getVolume(); this.blockFile = blockFile; this.metaFile = metaFile; this.block = block; @@ -281,6 +286,7 @@ class FsDatasetAsyncDiskService { LOG.info("Deleted " + block.getBlockPoolId() + " " + block.getLocalBlock() + " file " + blockFile); } + IOUtils.cleanup(null, volumeRef); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 934ae9c..ab16f18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Collection; @@ -76,6 +77,7 @@ import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; +import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; @@ -83,6 +85,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; @@ -138,22 +141,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { @Override // FsDatasetSpi public StorageReport[] getStorageReports(String bpid) throws IOException { - StorageReport[] reports; + List<StorageReport> reports; synchronized (statsLock) { List<FsVolumeImpl> curVolumes = getVolumes(); - reports = new StorageReport[curVolumes.size()]; - int i = 0; + reports = new ArrayList<>(curVolumes.size()); for (FsVolumeImpl volume : curVolumes) { - reports[i++] = new StorageReport(volume.toDatanodeStorage(), - false, - volume.getCapacity(), - volume.getDfsUsed(), - volume.getAvailable(), - volume.getBlockPoolUsed(bpid)); + try (FsVolumeReference ref = volume.obtainReference()) { + StorageReport sr = new StorageReport(volume.toDatanodeStorage(), + false, + volume.getCapacity(), + volume.getDfsUsed(), + volume.getAvailable(), + volume.getBlockPoolUsed(bpid)); + reports.add(sr); + } catch (ClosedChannelException e) { + continue; + } } } - return reports; + return reports.toArray(new StorageReport[reports.size()]); } @Override @@ -625,17 +632,24 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long ckoff) throws IOException { ReplicaInfo info = getReplicaInfo(b); - File blockFile = info.getBlockFile(); - RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r"); - if (blkOffset > 0) { - blockInFile.seek(blkOffset); - } - File metaFile = info.getMetaFile(); - RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r"); - if (ckoff > 0) { - metaInFile.seek(ckoff); + FsVolumeReference ref = info.getVolume().obtainReference(); + try { + File blockFile = info.getBlockFile(); + RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r"); + if (blkOffset > 0) { + blockInFile.seek(blkOffset); + } + File metaFile = info.getMetaFile(); + RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r"); + if (ckoff > 0) { + metaInFile.seek(ckoff); + } + return new ReplicaInputStreams( + blockInFile.getFD(), metaInFile.getFD(), ref); + } catch (IOException e) { + IOUtils.cleanup(null, ref); + throw e; } - return new ReplicaInputStreams(blockInFile.getFD(), metaInFile.getFD()); } static File moveBlockFiles(Block b, File srcfile, File destdir) @@ -729,26 +743,27 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { + replicaInfo.getVolume().getStorageType()); } - FsVolumeImpl targetVolume = volumes.getNextVolume(targetStorageType, - block.getNumBytes()); - File oldBlockFile = replicaInfo.getBlockFile(); - File oldMetaFile = replicaInfo.getMetaFile(); + try (FsVolumeReference volumeRef = volumes.getNextVolume( + targetStorageType, block.getNumBytes())) { + File oldBlockFile = replicaInfo.getBlockFile(); + File oldMetaFile = replicaInfo.getMetaFile(); + FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume(); + // Copy files to temp dir first + File[] blockFiles = copyBlockFiles(block.getBlockId(), + block.getGenerationStamp(), oldMetaFile, oldBlockFile, + targetVolume.getTmpDir(block.getBlockPoolId()), + replicaInfo.isOnTransientStorage()); - // Copy files to temp dir first - File[] blockFiles = copyBlockFiles(block.getBlockId(), - block.getGenerationStamp(), oldMetaFile, oldBlockFile, - targetVolume.getTmpDir(block.getBlockPoolId()), - replicaInfo.isOnTransientStorage()); + ReplicaInfo newReplicaInfo = new ReplicaInPipeline( + replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(), + targetVolume, blockFiles[0].getParentFile(), 0); + newReplicaInfo.setNumBytes(blockFiles[1].length()); + // Finalize the copied files + newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); - ReplicaInfo newReplicaInfo = new ReplicaInPipeline( - replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(), - targetVolume, blockFiles[0].getParentFile(), 0); - newReplicaInfo.setNumBytes(blockFiles[1].length()); - // Finalize the copied files - newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); - - removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile, - oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId()); + removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile, + oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId()); + } // Replace the old block if any to reschedule the scanning. datanode.getBlockScanner().addBlock(block); @@ -867,7 +882,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { @Override // FsDatasetSpi - public synchronized ReplicaInPipeline append(ExtendedBlock b, + public synchronized ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { // If the block was successfully finalized because all packets // were successfully processed at the Datanode but the ack for @@ -892,8 +907,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { " expected length is " + expectedBlockLen); } - return append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS, - b.getNumBytes()); + FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); + ReplicaBeingWritten replica = null; + try { + replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS, + b.getNumBytes()); + } catch (IOException e) { + IOUtils.cleanup(null, ref); + throw e; + } + return new ReplicaHandler(replica, ref); } /** Append to a finalized replica @@ -1014,22 +1037,30 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { return replicaInfo; } - + @Override // FsDatasetSpi - public synchronized ReplicaInPipeline recoverAppend(ExtendedBlock b, - long newGS, long expectedBlockLen) throws IOException { + public synchronized ReplicaHandler recoverAppend( + ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { LOG.info("Recover failed append to " + b); ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); - // change the replica's state/gs etc. - if (replicaInfo.getState() == ReplicaState.FINALIZED ) { - return append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo, newGS, - b.getNumBytes()); - } else { //RBW - bumpReplicaGS(replicaInfo, newGS); - return (ReplicaBeingWritten)replicaInfo; + FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); + ReplicaBeingWritten replica; + try { + // change the replica's state/gs etc. + if (replicaInfo.getState() == ReplicaState.FINALIZED) { + replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo, + newGS, b.getNumBytes()); + } else { //RBW + bumpReplicaGS(replicaInfo, newGS); + replica = (ReplicaBeingWritten) replicaInfo; + } + } catch (IOException e) { + IOUtils.cleanup(null, ref); + throw e; } + return new ReplicaHandler(replica, ref); } @Override // FsDatasetSpi @@ -1077,8 +1108,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } @Override // FsDatasetSpi - public synchronized ReplicaInPipeline createRbw(StorageType storageType, - ExtendedBlock b, boolean allowLazyPersist) throws IOException { + public synchronized ReplicaHandler createRbw( + StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) + throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { @@ -1087,15 +1119,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { " and thus cannot be created."); } // create a new block - FsVolumeImpl v; + FsVolumeReference ref; while (true) { try { if (allowLazyPersist) { // First try to place the block on a transient volume. - v = volumes.getNextTransientVolume(b.getNumBytes()); + ref = volumes.getNextTransientVolume(b.getNumBytes()); datanode.getMetrics().incrRamDiskBlocksWrite(); } else { - v = volumes.getNextVolume(storageType, b.getNumBytes()); + ref = volumes.getNextVolume(storageType, b.getNumBytes()); } } catch (DiskOutOfSpaceException de) { if (allowLazyPersist) { @@ -1107,18 +1139,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } break; } + FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); // create an rbw file to hold block in the designated volume - File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); + File f; + try { + f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); + } catch (IOException e) { + IOUtils.cleanup(null, ref); + throw e; + } + ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); - - return newReplicaInfo; + return new ReplicaHandler(newReplicaInfo, ref); } - + @Override // FsDatasetSpi - public synchronized ReplicaInPipeline recoverRbw(ExtendedBlock b, - long newGS, long minBytesRcvd, long maxBytesRcvd) + public synchronized ReplicaHandler recoverRbw( + ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { LOG.info("Recover RBW replica " + b); @@ -1157,20 +1196,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { minBytesRcvd + ", " + maxBytesRcvd + "]."); } - // Truncate the potentially corrupt portion. - // If the source was client and the last node in the pipeline was lost, - // any corrupt data written after the acked length can go unnoticed. - if (numBytes > bytesAcked) { - final File replicafile = rbw.getBlockFile(); - truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked); - rbw.setNumBytes(bytesAcked); - rbw.setLastChecksumAndDataLen(bytesAcked, null); - } + FsVolumeReference ref = rbw.getVolume().obtainReference(); + try { + // Truncate the potentially corrupt portion. + // If the source was client and the last node in the pipeline was lost, + // any corrupt data written after the acked length can go unnoticed. + if (numBytes > bytesAcked) { + final File replicafile = rbw.getBlockFile(); + truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked); + rbw.setNumBytes(bytesAcked); + rbw.setLastChecksumAndDataLen(bytesAcked, null); + } - // bump the replica's generation stamp to newGS - bumpReplicaGS(rbw, newGS); - - return rbw; + // bump the replica's generation stamp to newGS + bumpReplicaGS(rbw, newGS); + } catch (IOException e) { + IOUtils.cleanup(null, ref); + throw e; + } + return new ReplicaHandler(rbw, ref); } @Override // FsDatasetSpi @@ -1235,8 +1279,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } @Override // FsDatasetSpi - public synchronized ReplicaInPipeline createTemporary(StorageType storageType, - ExtendedBlock b) throws IOException { + public synchronized ReplicaHandler createTemporary( + StorageType storageType, ExtendedBlock b) throws IOException { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { if (replicaInfo.getGenerationStamp() < b.getGenerationStamp() @@ -1251,14 +1295,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { " and thus cannot be created."); } } - - FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes()); + + FsVolumeReference ref = volumes.getNextVolume(storageType, b.getNumBytes()); + FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); // create a temporary file to hold block in the designated volume - File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); + File f; + try { + f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); + } catch (IOException e) { + IOUtils.cleanup(null, ref); + throw e; + } + ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile(), 0); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); - return newReplicaInfo; + return new ReplicaHandler(newReplicaInfo, ref); } /** @@ -1641,10 +1693,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // Delete the block asynchronously to make sure we can do it fast enough. // It's ok to unlink the block file before the uncache operation // finishes. - asyncDiskService.deleteAsync(v, f, - FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()), - new ExtendedBlock(bpid, invalidBlks[i]), - dataStorage.getTrashDirectoryForBlockFile(bpid, f)); + try { + asyncDiskService.deleteAsync(v.obtainReference(), f, + FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()), + new ExtendedBlock(bpid, invalidBlks[i]), + dataStorage.getTrashDirectoryForBlockFile(bpid, f)); + } catch (ClosedChannelException e) { + LOG.warn("Volume " + v + " is closed, ignore the deletion task for " + + "block " + invalidBlks[i]); + } } if (!errors.isEmpty()) { StringBuilder b = new StringBuilder("Failed to delete ") @@ -2291,9 +2348,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { for (FsVolumeImpl volume : getVolumes()) { long used = 0; long free = 0; - try { + try (FsVolumeReference ref = volume.obtainReference()) { used = volume.getDfsUsed(); free = volume.getAvailable(); + } catch (ClosedChannelException e) { + continue; } catch (IOException e) { LOG.warn(e.getMessage()); used = 0; @@ -2325,15 +2384,23 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { List<FsVolumeImpl> curVolumes = getVolumes(); if (!force) { for (FsVolumeImpl volume : curVolumes) { - if (!volume.isBPDirEmpty(bpid)) { - LOG.warn(bpid + " has some block files, cannot delete unless forced"); - throw new IOException("Cannot delete block pool, " - + "it contains some block files"); + try (FsVolumeReference ref = volume.obtainReference()) { + if (!volume.isBPDirEmpty(bpid)) { + LOG.warn(bpid + " has some block files, cannot delete unless forced"); + throw new IOException("Cannot delete block pool, " + + "it contains some block files"); + } + } catch (ClosedChannelException e) { + // ignore. } } } for (FsVolumeImpl volume : curVolumes) { - volume.deleteBPDirectories(bpid, force); + try (FsVolumeReference ref = volume.obtainReference()) { + volume.deleteBPDirectories(bpid, force); + } catch (ClosedChannelException e) { + // ignore. + } } } @@ -2566,6 +2633,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { */ private boolean saveNextReplica() { RamDiskReplica block = null; + FsVolumeReference targetReference; FsVolumeImpl targetVolume; ReplicaInfo replicaInfo; boolean succeeded = false; @@ -2583,8 +2651,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { if (replicaInfo != null && replicaInfo.getVolume().isTransientStorage()) { // Pick a target volume to persist the block. - targetVolume = volumes.getNextVolume( + targetReference = volumes.getNextVolume( StorageType.DEFAULT, replicaInfo.getNumBytes()); + targetVolume = (FsVolumeImpl) targetReference.getVolume(); ramDiskReplicaTracker.recordStartLazyPersist( block.getBlockPoolId(), block.getBlockId(), targetVolume); @@ -2600,7 +2669,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { block.getBlockPoolId(), block.getBlockId(), replicaInfo.getGenerationStamp(), block.getCreationTime(), replicaInfo.getMetaFile(), replicaInfo.getBlockFile(), - targetVolume); + targetReference); } } } @@ -2624,9 +2693,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // Don't worry about fragmentation for now. We don't expect more than one // transient volume per DN. for (FsVolumeImpl v : getVolumes()) { - if (v.isTransientStorage()) { - capacity += v.getCapacity(); - free += v.getAvailable(); + try (FsVolumeReference ref = v.obtainReference()) { + if (v.isTransientStorage()) { + capacity += v.getCapacity(); + free += v.getAvailable(); + } + } catch (ClosedChannelException e) { + // ignore. } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index a0cfb55..a2d4f2e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; import java.io.IOException; +import java.nio.channels.ClosedChannelException; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -31,6 +32,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DF; @@ -40,8 +43,10 @@ import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.util.CloseableReferenceCount; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -62,6 +67,7 @@ public class FsVolumeImpl implements FsVolumeSpi { private final File currentDir; // <StorageDirectory>/current private final DF usage; private final long reserved; + private CloseableReferenceCount reference = new CloseableReferenceCount(); // Disk space reserved for open blocks. private AtomicLong reservedForRbw; @@ -99,6 +105,10 @@ public class FsVolumeImpl implements FsVolumeSpi { if (storageType.isTransient()) { return null; } + if (dataset.datanode == null) { + // FsVolumeImpl is used in test. + return null; + } final int maxNumThreads = dataset.datanode.getConf().getInt( DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, @@ -116,7 +126,114 @@ public class FsVolumeImpl implements FsVolumeSpi { executor.allowCoreThreadTimeOut(true); return executor; } - + + private void printReferenceTraceInfo(String op) { + StackTraceElement[] stack = Thread.currentThread().getStackTrace(); + for (StackTraceElement ste : stack) { + switch (ste.getMethodName()) { + case "getDfsUsed": + case "getBlockPoolUsed": + case "getAvailable": + case "getVolumeMap": + return; + default: + break; + } + } + FsDatasetImpl.LOG.trace("Reference count: " + op + " " + this + ": " + + this.reference.getReferenceCount()); + FsDatasetImpl.LOG.trace( + Joiner.on("\n").join(Thread.currentThread().getStackTrace())); + } + + /** + * Increase the reference count. The caller must increase the reference count + * before issuing IOs. + * + * @throws IOException if the volume is already closed. + */ + private void reference() throws ClosedChannelException { + this.reference.reference(); + if (FsDatasetImpl.LOG.isTraceEnabled()) { + printReferenceTraceInfo("incr"); + } + } + + /** + * Decrease the reference count. + */ + private void unreference() { + if (FsDatasetImpl.LOG.isTraceEnabled()) { + printReferenceTraceInfo("desc"); + } + if (FsDatasetImpl.LOG.isDebugEnabled()) { + if (reference.getReferenceCount() <= 0) { + FsDatasetImpl.LOG.debug("Decrease reference count <= 0 on " + this + + Joiner.on("\n").join(Thread.currentThread().getStackTrace())); + } + } + checkReference(); + this.reference.unreference(); + } + + private static class FsVolumeReferenceImpl implements FsVolumeReference { + private final FsVolumeImpl volume; + + FsVolumeReferenceImpl(FsVolumeImpl volume) throws ClosedChannelException { + this.volume = volume; + volume.reference(); + } + + /** + * Decreases the reference count. + * @throws IOException it never throws IOException. + */ + @Override + public void close() throws IOException { + volume.unreference(); + } + + @Override + public FsVolumeSpi getVolume() { + return this.volume; + } + } + + @Override + public FsVolumeReference obtainReference() throws ClosedChannelException { + return new FsVolumeReferenceImpl(this); + } + + private void checkReference() { + Preconditions.checkState(reference.getReferenceCount() > 0); + } + + /** + * Close this volume and wait all other threads to release the reference count + * on this volume. + * @throws IOException if the volume is closed or the waiting is interrupted. + */ + void closeAndWait() throws IOException { + try { + this.reference.setClosed(); + } catch (ClosedChannelException e) { + throw new IOException("The volume has already closed.", e); + } + final int SLEEP_MILLIS = 500; + while (this.reference.getReferenceCount() > 0) { + if (FsDatasetImpl.LOG.isDebugEnabled()) { + FsDatasetImpl.LOG.debug(String.format( + "The reference count for %s is %d, wait to be 0.", + this, reference.getReferenceCount())); + } + try { + Thread.sleep(SLEEP_MILLIS); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + } + File getCurrentDir() { return currentDir; } @@ -250,6 +367,7 @@ public class FsVolumeImpl implements FsVolumeSpi { * the block is finalized. */ File createTmpFile(String bpid, Block b) throws IOException { + checkReference(); return getBlockPoolSlice(bpid).createTmpFile(b); } @@ -282,6 +400,7 @@ public class FsVolumeImpl implements FsVolumeSpi { * the block is finalized. */ File createRbwFile(String bpid, Block b) throws IOException { + checkReference(); reserveSpaceForRbw(b.getNumBytes()); return getBlockPoolSlice(bpid).createRbwFile(b); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 1c3ccbb..c837593 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; import java.io.IOException; +import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.util.DiskChecker.DiskErrorException; @@ -59,6 +61,21 @@ class FsVolumeList { return Collections.unmodifiableList(Arrays.asList(volumes.get())); } + private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long blockSize) + throws IOException { + while (true) { + FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize); + try { + return volume.obtainReference(); + } catch (ClosedChannelException e) { + FsDatasetImpl.LOG.warn("Chosen a closed volume: " + volume); + // blockChooser.chooseVolume returns DiskOutOfSpaceException when the list + // is empty, indicating that all volumes are closed. + list.remove(volume); + } + } + } + /** * Get next volume. * @@ -66,7 +83,7 @@ class FsVolumeList { * @param storageType the desired {@link StorageType} * @return next volume to store the block in. */ - FsVolumeImpl getNextVolume(StorageType storageType, long blockSize) + FsVolumeReference getNextVolume(StorageType storageType, long blockSize) throws IOException { // Get a snapshot of currently available volumes. final FsVolumeImpl[] curVolumes = volumes.get(); @@ -76,7 +93,7 @@ class FsVolumeList { list.add(v); } } - return blockChooser.chooseVolume(list, blockSize); + return chooseVolume(list, blockSize); } /** @@ -85,7 +102,7 @@ class FsVolumeList { * @param blockSize free space needed on the volume * @return next volume to store the block in. */ - FsVolumeImpl getNextTransientVolume(long blockSize) throws IOException { + FsVolumeReference getNextTransientVolume(long blockSize) throws IOException { // Get a snapshot of currently available volumes. final List<FsVolumeImpl> curVolumes = getVolumes(); final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.size()); @@ -94,13 +111,17 @@ class FsVolumeList { list.add(v); } } - return blockChooser.chooseVolume(list, blockSize); + return chooseVolume(list, blockSize); } long getDfsUsed() throws IOException { long dfsUsed = 0L; for (FsVolumeImpl v : volumes.get()) { - dfsUsed += v.getDfsUsed(); + try(FsVolumeReference ref = v.obtainReference()) { + dfsUsed += v.getDfsUsed(); + } catch (ClosedChannelException e) { + // ignore. + } } return dfsUsed; } @@ -108,7 +129,11 @@ class FsVolumeList { long getBlockPoolUsed(String bpid) throws IOException { long dfsUsed = 0L; for (FsVolumeImpl v : volumes.get()) { - dfsUsed += v.getBlockPoolUsed(bpid); + try (FsVolumeReference ref = v.obtainReference()) { + dfsUsed += v.getBlockPoolUsed(bpid); + } catch (ClosedChannelException e) { + // ignore. + } } return dfsUsed; } @@ -116,7 +141,11 @@ class FsVolumeList { long getCapacity() { long capacity = 0L; for (FsVolumeImpl v : volumes.get()) { - capacity += v.getCapacity(); + try (FsVolumeReference ref = v.obtainReference()) { + capacity += v.getCapacity(); + } catch (IOException e) { + // ignore. + } } return capacity; } @@ -124,7 +153,11 @@ class FsVolumeList { long getRemaining() throws IOException { long remaining = 0L; for (FsVolumeSpi vol : volumes.get()) { - remaining += vol.getAvailable(); + try (FsVolumeReference ref = vol.obtainReference()) { + remaining += vol.getAvailable(); + } catch (ClosedChannelException e) { + // ignore + } } return remaining; } @@ -140,7 +173,7 @@ class FsVolumeList { for (final FsVolumeImpl v : volumes.get()) { Thread t = new Thread() { public void run() { - try { + try (FsVolumeReference ref = v.obtainReference()) { FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid + " on volume " + v + "..."); long startTime = Time.monotonicNow(); @@ -148,6 +181,9 @@ class FsVolumeList { long timeTaken = Time.monotonicNow() - startTime; FsDatasetImpl.LOG.info("Time to add replicas to map for block pool" + " " + bpid + " on volume " + v + ": " + timeTaken + "ms"); + } catch (ClosedChannelException e) { + FsDatasetImpl.LOG.info("The volume " + v + " is closed while " + + "addng replicas, ignored."); } catch (IOException ioe) { FsDatasetImpl.LOG.info("Caught exception while adding replicas " + "from " + v + ". Will throw later.", ioe); @@ -190,16 +226,21 @@ class FsVolumeList { for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) { final FsVolumeImpl fsv = i.next(); - try { + try (FsVolumeReference ref = fsv.obtainReference()) { fsv.checkDirs(); } catch (DiskErrorException e) { - FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ",e); + FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e); if (removedVols == null) { - removedVols = new ArrayList<FsVolumeImpl>(1); + removedVols = new ArrayList<>(1); } removedVols.add(fsv); removeVolume(fsv); numFailedVolumes++; + } catch (ClosedChannelException e) { + FsDatasetImpl.LOG.debug("Caught exception when obtaining " + + "reference count on closed volume", e); + } catch (IOException e) { + FsDatasetImpl.LOG.error("Unexpected IOException", e); } } @@ -222,7 +263,6 @@ class FsVolumeList { * @param newVolume the instance of new FsVolumeImpl. */ void addVolume(FsVolumeImpl newVolume) { - // Make a copy of volumes to add new volumes. while (true) { final FsVolumeImpl[] curVolumes = volumes.get(); final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes); @@ -253,6 +293,12 @@ class FsVolumeList { if (volumeList.remove(target)) { if (volumes.compareAndSet(curVolumes, volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) { + try { + target.closeAndWait(); + } catch (IOException e) { + FsDatasetImpl.LOG.warn( + "Error occurs when waiting volume to close: " + target, e); + } target.shutdown(); FsDatasetImpl.LOG.info("Removed volume: " + target); break; @@ -302,7 +348,7 @@ class FsVolumeList { for (final FsVolumeImpl v : volumes.get()) { Thread t = new Thread() { public void run() { - try { + try (FsVolumeReference ref = v.obtainReference()) { FsDatasetImpl.LOG.info("Scanning block pool " + bpid + " on volume " + v + "..."); long startTime = Time.monotonicNow(); @@ -310,6 +356,8 @@ class FsVolumeList { long timeTaken = Time.monotonicNow() - startTime; FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid + " on " + v + ": " + timeTaken + "ms"); + } catch (ClosedChannelException e) { + // ignore. } catch (IOException ioe) { FsDatasetImpl.LOG.info("Caught exception while scanning " + v + ". Will throw later.", ioe); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java index c9aba8a..30ff948 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import java.io.File; import java.io.IOException; @@ -175,13 +176,14 @@ class RamDiskAsyncLazyPersistService { void submitLazyPersistTask(String bpId, long blockId, long genStamp, long creationTime, File metaFile, File blockFile, - FsVolumeImpl targetVolume) throws IOException { + FsVolumeReference target) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: " + bpId + " block id: " + blockId); } - File lazyPersistDir = targetVolume.getLazyPersistDir(bpId); + FsVolumeImpl volume = (FsVolumeImpl)target.getVolume(); + File lazyPersistDir = volume.getLazyPersistDir(bpId); if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) { FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir); throw new IOException("LazyWriter fail to find or create lazy persist dir: " @@ -190,8 +192,8 @@ class RamDiskAsyncLazyPersistService { ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask( bpId, blockId, genStamp, creationTime, blockFile, metaFile, - targetVolume, lazyPersistDir); - execute(targetVolume.getCurrentDir(), lazyPersistTask); + target, lazyPersistDir); + execute(volume.getCurrentDir(), lazyPersistTask); } class ReplicaLazyPersistTask implements Runnable { @@ -201,13 +203,13 @@ class RamDiskAsyncLazyPersistService { final long creationTime; final File blockFile; final File metaFile; - final FsVolumeImpl targetVolume; + final FsVolumeReference targetVolume; final File lazyPersistDir; ReplicaLazyPersistTask(String bpId, long blockId, long genStamp, long creationTime, File blockFile, File metaFile, - FsVolumeImpl targetVolume, File lazyPersistDir) { + FsVolumeReference targetVolume, File lazyPersistDir) { this.bpId = bpId; this.blockId = blockId; this.genStamp = genStamp; @@ -236,7 +238,7 @@ class RamDiskAsyncLazyPersistService { // Lock FsDataSetImpl during onCompleteLazyPersist callback datanode.getFSDataset().onCompleteLazyPersist(bpId, blockId, - creationTime, targetFiles, targetVolume); + creationTime, targetFiles, (FsVolumeImpl)targetVolume.getVolume()); succeeded = true; } catch (Exception e){ FsDatasetImpl.LOG.warn( http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index fd1ba8a..4bd7bda 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -220,7 +220,7 @@ enum Status { CHECKSUM_OK = 6; ERROR_UNSUPPORTED = 7; OOB_RESTART = 8; // Quick restart - OOB_INTERRUPTED = 9; // Interrupted + OOB_RESERVED1 = 9; // Reserved OOB_RESERVED2 = 10; // Reserved OOB_RESERVED3 = 11; // Reserved IN_PROGRESS = 12; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java index b7fdccf..7f318df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java @@ -97,7 +97,7 @@ public class TestWriteBlockGetsBlockLengthHint { * correctly propagate the hint to FsDatasetSpi. */ @Override - public synchronized ReplicaInPipelineInterface createRbw( + public synchronized ReplicaHandler createRbw( StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException { assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index e03b756..8ed0b22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -22,6 +22,7 @@ import java.io.FileDescriptor; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -43,8 +44,8 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; -import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; @@ -147,7 +148,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { oStream = null; } } - + @Override public String getStorageUuid() { return storage.getStorageUuid(); @@ -432,6 +433,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { } @Override + public FsVolumeReference obtainReference() throws ClosedChannelException { + return null; + } + + @Override public String getStorageID() { return storage.getStorageUuid(); } @@ -780,8 +786,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { } @Override // FsDatasetSpi - public synchronized ReplicaInPipelineInterface append(ExtendedBlock b, - long newGS, long expectedBlockLen) throws IOException { + public synchronized ReplicaHandler append( + ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null || !binfo.isFinalized()) { @@ -789,12 +795,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { + " is not valid, and cannot be appended to."); } binfo.unfinalizeBlock(); - return binfo; + return new ReplicaHandler(binfo, null); } @Override // FsDatasetSpi - public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, - long newGS, long expectedBlockLen) throws IOException { + public synchronized ReplicaHandler recoverAppend( + ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { @@ -807,7 +813,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { map.remove(b); binfo.theBlock.setGenerationStamp(newGS); map.put(binfo.theBlock, binfo); - return binfo; + return new ReplicaHandler(binfo, null); } @Override // FsDatasetSpi @@ -829,8 +835,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { } @Override // FsDatasetSpi - public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, - long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { + public synchronized ReplicaHandler recoverRbw( + ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) + throws IOException { final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); if ( binfo == null) { @@ -844,18 +851,18 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { map.remove(b); binfo.theBlock.setGenerationStamp(newGS); map.put(binfo.theBlock, binfo); - return binfo; + return new ReplicaHandler(binfo, null); } @Override // FsDatasetSpi - public synchronized ReplicaInPipelineInterface createRbw( + public synchronized ReplicaHandler createRbw( StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException { return createTemporary(storageType, b); } @Override // FsDatasetSpi - public synchronized ReplicaInPipelineInterface createTemporary( + public synchronized ReplicaHandler createTemporary( StorageType storageType, ExtendedBlock b) throws IOException { if (isValidBlock(b)) { throw new ReplicaAlreadyExistsException("Block " + b + @@ -868,7 +875,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true); map.put(binfo.theBlock, binfo); - return binfo; + return new ReplicaHandler(binfo, null); } synchronized InputStream getBlockInputStream(ExtendedBlock b http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 4dfc773..22d05a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -553,7 +553,7 @@ public class TestBlockRecovery { LOG.debug("Running " + GenericTestUtils.getMethodName()); } ReplicaInPipelineInterface replicaInfo = dn.data.createRbw( - StorageType.DEFAULT, block, false); + StorageType.DEFAULT, block, false).getReplica(); ReplicaOutputStreams streams = null; try { streams = replicaInfo.createStreams(true, http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index d468493..dfcbb6d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -56,6 +56,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; @@ -568,7 +570,7 @@ public class TestDataNodeHotSwapVolumes { @Test(timeout=180000) public void testRemoveVolumeBeingWritten() throws InterruptedException, TimeoutException, ReconfigurationException, - IOException { + IOException, BrokenBarrierException { // test against removing volumes on the different DataNode on the pipeline. for (int i = 0; i < 3; i++) { testRemoveVolumeBeingWrittenForDatanode(i); @@ -582,7 +584,7 @@ public class TestDataNodeHotSwapVolumes { */ private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx) throws IOException, ReconfigurationException, TimeoutException, - InterruptedException { + InterruptedException, BrokenBarrierException { // Starts DFS cluster with 3 DataNodes to form a pipeline. startDFSCluster(1, 3); @@ -599,11 +601,27 @@ public class TestDataNodeHotSwapVolumes { out.write(writeBuf); out.hflush(); + final CyclicBarrier barrier = new CyclicBarrier(2); + List<String> oldDirs = getDataDirs(dn); - String newDirs = oldDirs.get(1); // Remove the first volume. - dn.reconfigurePropertyImpl( - DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs); + final String newDirs = oldDirs.get(1); // Remove the first volume. + final List<Exception> exceptions = new ArrayList<>(); + Thread reconfigThread = new Thread() { + public void run() { + try { + barrier.await(); + dn.reconfigurePropertyImpl( + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs); + } catch (ReconfigurationException | + InterruptedException | + BrokenBarrierException e) { + exceptions.add(e); + } + } + }; + reconfigThread.start(); + barrier.await(); rb.nextBytes(writeBuf); out.write(writeBuf); out.hflush(); @@ -614,5 +632,10 @@ public class TestDataNodeHotSwapVolumes { // Read the content back byte[] content = DFSTestUtil.readFileBuffer(fs, testFile); assertEquals(BLOCK_SIZE, content.length); + + reconfigThread.join(); + if (!exceptions.isEmpty()) { + throw new IOException(exceptions.get(0).getCause()); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index be034fb..33675c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.LinkedList; import java.util.List; @@ -44,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.io.IOUtils; @@ -539,7 +541,12 @@ public class TestDirectoryScanner { public String[] getBlockPoolList() { return new String[0]; } - + + @Override + public FsVolumeReference obtainReference() throws ClosedChannelException { + return null; + } + @Override public long getAvailable() throws IOException { return 0; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index 099a0cd..b9adce4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -67,7 +67,7 @@ public class TestSimulatedFSDataset { // we pass expected len as zero, - fsdataset should use the sizeof actual // data written ReplicaInPipelineInterface bInfo = fsdataset.createRbw( - StorageType.DEFAULT, b, false); + StorageType.DEFAULT, b, false).getReplica(); ReplicaOutputStreams out = bInfo.createStreams(true, DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 84e1d58..ca936b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.server.datanode.DNConf; import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; @@ -164,14 +165,16 @@ public class TestFsDatasetImpl { assertEquals(actualVolumes, expectedVolumes); } - @Test + @Test(timeout = 30000) public void testRemoveVolumes() throws IOException { // Feed FsDataset with block metadata. final int NUM_BLOCKS = 100; for (int i = 0; i < NUM_BLOCKS; i++) { String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length]; ExtendedBlock eb = new ExtendedBlock(bpid, i); - dataset.createRbw(StorageType.DEFAULT, eb, false); + try (ReplicaHandler replica = + dataset.createRbw(StorageType.DEFAULT, eb, false)) { + } } final String[] dataDirs = conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(","); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec2b6ca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index 60c6d03..5aafc9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; @@ -148,7 +149,8 @@ public class TestWriteToReplica { }; ReplicaMap replicasMap = dataSet.volumeMap; - FsVolumeImpl vol = dataSet.volumes.getNextVolume(StorageType.DEFAULT, 0); + FsVolumeImpl vol = (FsVolumeImpl) dataSet.volumes + .getNextVolume(StorageType.DEFAULT, 0).getVolume(); ReplicaInfo replicaInfo = new FinalizedReplica( blocks[FINALIZED].getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); replicasMap.add(bpid, replicaInfo); @@ -157,10 +159,10 @@ public class TestWriteToReplica { replicasMap.add(bpid, new ReplicaInPipeline( blocks[TEMPORARY].getBlockId(), - blocks[TEMPORARY].getGenerationStamp(), vol, + blocks[TEMPORARY].getGenerationStamp(), vol, vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()).getParentFile(), 0)); - replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol, + replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol, vol.createRbwFile(bpid, blocks[RBW].getLocalBlock()).getParentFile(), null); replicasMap.add(bpid, replicaInfo); replicaInfo.getBlockFile().createNewFile(); @@ -489,8 +491,8 @@ public class TestWriteToReplica { long newGenStamp = blocks[NON_EXISTENT].getGenerationStamp() * 10; blocks[NON_EXISTENT].setGenerationStamp(newGenStamp); try { - ReplicaInPipeline replicaInfo = - dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]); + ReplicaInPipelineInterface replicaInfo = + dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]).getReplica(); Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp); Assert.assertTrue( replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());
