Repository: hadoop Updated Branches: refs/heads/HDFS-7240 770ed9262 -> e721a39e2
HDFS-8392. DataNode support for multiple datasets. (Arpit Agarwal) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e721a39e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e721a39e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e721a39e Branch: refs/heads/HDFS-7240 Commit: e721a39e275ae4ee34af99b2c2450e1793b695ac Parents: 770ed92 Author: Arpit Agarwal <[email protected]> Authored: Mon Jun 1 14:57:07 2015 -0700 Committer: Arpit Agarwal <[email protected]> Committed: Mon Jun 1 14:57:07 2015 -0700 ---------------------------------------------------------------------- .../hadoop-hdfs/CHANGES-HDFS-7240.txt | 6 + .../hadoop/hdfs/server/common/StorageInfo.java | 9 + .../hdfs/server/datanode/BPOfferService.java | 23 +- .../hdfs/server/datanode/BPServiceActor.java | 24 +- .../hdfs/server/datanode/BlockReceiver.java | 35 +-- .../hdfs/server/datanode/BlockSender.java | 20 +- .../hadoop/hdfs/server/datanode/DataNode.java | 269 +++++++++++++------ .../hdfs/server/datanode/DataXceiver.java | 70 ++++- .../hdfs/server/datanode/VolumeScanner.java | 5 +- .../server/datanode/fsdataset/FsDatasetSpi.java | 10 +- .../server/datanode/fsdataset/FsVolumeSpi.java | 4 + .../fsdataset/impl/FsDatasetFactory.java | 25 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 9 +- .../impl/RamDiskAsyncLazyPersistService.java | 6 +- .../hdfs/TestWriteBlockGetsBlockLengthHint.java | 4 +- .../server/datanode/SimulatedFSDataset.java | 4 +- .../server/datanode/TestBPOfferService.java | 19 +- .../datanode/TestDataNodeInitStorage.java | 3 +- 18 files changed, 384 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt new file mode 100644 index 0000000..a98d407 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt @@ -0,0 +1,6 @@ + Breakdown of HDFS-7240 sub-tasks: + + HDFS-8210. Ozone: Implement storage container manager. (Jitendra Pandey) + + HDFS-8392. Ozone: DataNode support for multiple datasets. (Arpit Agarwal) + http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java index 50363c9..5a19dae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java @@ -220,6 +220,15 @@ public class StorageInfo { this.layoutVersion = lv; } + /** + * Return the type of node serviced by this storage. + * + * @return type of node serviced by this storage. + */ + public NodeType getNodeType() { + return storageType; + } + public int getServiceLayoutVersion() { return storageType == NodeType.DATA_NODE ? HdfsServerConstants.DATANODE_LAYOUT_VERSION : HdfsServerConstants.NAMENODE_LAYOUT_VERSION; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 92323f1..092a8f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.protocol.*; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; @@ -70,6 +71,8 @@ class BPOfferService { private final DataNode dn; + private FsDatasetSpi<?> dataset = null; + /** * A reference to the BPServiceActor associated with the currently * ACTIVE NN. In the case that all NameNodes are in STANDBY mode, @@ -303,7 +306,8 @@ class BPOfferService { * verifies that this namespace matches (eg to prevent a misconfiguration * where a StandbyNode from a different cluster is specified) */ - void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException { + FsDatasetSpi<?> verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) + throws IOException { writeLock(); try { if (this.bpNSInfo == null) { @@ -314,7 +318,7 @@ class BPOfferService { // The DN can now initialize its local storage if we are the // first BP to handshake, etc. try { - dn.initBlockPool(this); + dataset = dn.initBlockPool(this); success = true; } finally { if (!success) { @@ -335,6 +339,7 @@ class BPOfferService { } finally { writeUnlock(); } + return dataset; } /** @@ -480,11 +485,11 @@ class BPOfferService { } String bpid = getBlockPoolId(); if (!rollingUpgradeStatus.isFinalized()) { - dn.getFSDataset().enableTrash(bpid); - dn.getFSDataset().setRollingUpgradeMarker(bpid); + dataset.enableTrash(bpid); + dataset.setRollingUpgradeMarker(bpid); } else { - dn.getFSDataset().clearTrash(bpid); - dn.getFSDataset().clearRollingUpgradeMarker(bpid); + dataset.clearTrash(bpid); + dataset.clearRollingUpgradeMarker(bpid); } } @@ -665,7 +670,7 @@ class BPOfferService { Block toDelete[] = bcmd.getBlocks(); try { // using global fsdataset - dn.getFSDataset().invalidate(bcmd.getBlockPoolId(), toDelete); + dataset.invalidate(bcmd.getBlockPoolId(), toDelete); } catch(IOException e) { // Exceptions caught here are not expected to be disk-related. throw e; @@ -676,13 +681,13 @@ class BPOfferService { LOG.info("DatanodeCommand action: DNA_CACHE for " + blockIdCmd.getBlockPoolId() + " of [" + blockIdArrayToString(blockIdCmd.getBlockIds()) + "]"); - dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds()); + dataset.cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds()); break; case DatanodeProtocol.DNA_UNCACHE: LOG.info("DatanodeCommand action: DNA_UNCACHE for " + blockIdCmd.getBlockPoolId() + " of [" + blockIdArrayToString(blockIdCmd.getBlockIds()) + "]"); - dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds()); + dataset.uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds()); break; case DatanodeProtocol.DNA_SHUTDOWN: // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 63a0bb6..1fe1c9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; @@ -110,6 +111,7 @@ class BPServiceActor implements Runnable { private volatile boolean sendImmediateIBR = false; private volatile boolean shouldServiceRun = true; private final DataNode dn; + private FsDatasetSpi<?> dataset = null; private final DNConf dnConf; private long prevBlockReportId; @@ -220,7 +222,7 @@ class BPServiceActor implements Runnable { // Verify that this matches the other NN in this HA pair. // This also initializes our block pool in the DN if we are // the first NN connection for this BP. - bpos.verifyAndSetNamespaceInfo(nsInfo); + dataset = bpos.verifyAndSetNamespaceInfo(nsInfo); // Second phase of the handshake with the NN. register(nsInfo); @@ -330,7 +332,7 @@ class BPServiceActor implements Runnable { String storageUuid, boolean now) { synchronized (pendingIncrementalBRperStorage) { addPendingReplicationBlockInfo( - bInfo, dn.getFSDataset().getStorage(storageUuid)); + bInfo, dataset.getStorage(storageUuid)); sendImmediateIBR = true; // If now is true, the report is sent right away. // Otherwise, it will be sent out in the next heartbeat. @@ -344,7 +346,7 @@ class BPServiceActor implements Runnable { ReceivedDeletedBlockInfo bInfo, String storageUuid) { synchronized (pendingIncrementalBRperStorage) { addPendingReplicationBlockInfo( - bInfo, dn.getFSDataset().getStorage(storageUuid)); + bInfo, dataset.getStorage(storageUuid)); } } @@ -435,7 +437,7 @@ class BPServiceActor implements Runnable { long brCreateStartTime = monotonicNow(); Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists = - dn.getFSDataset().getBlockReports(bpos.getBlockPoolId()); + dataset.getBlockReports(bpos.getBlockPoolId()); // Convert the reports to the format expected by the NN. int i = 0; @@ -508,7 +510,7 @@ class BPServiceActor implements Runnable { DatanodeCommand cacheReport() throws IOException { // If caching is disabled, do not send a cache report - if (dn.getFSDataset().getCacheCapacity() == 0) { + if (dataset.getCacheCapacity() == 0) { return null; } // send cache report if timer has expired. @@ -521,7 +523,7 @@ class BPServiceActor implements Runnable { lastCacheReport = startTime; String bpid = bpos.getBlockPoolId(); - List<Long> blockIds = dn.getFSDataset().getCacheReport(bpid); + List<Long> blockIds = dataset.getCacheReport(bpid); long createTime = monotonicNow(); cmd = bpNamenode.cacheReport(bpRegistration, bpid, blockIds); @@ -540,20 +542,20 @@ class BPServiceActor implements Runnable { HeartbeatResponse sendHeartBeat() throws IOException { StorageReport[] reports = - dn.getFSDataset().getStorageReports(bpos.getBlockPoolId()); + dataset.getStorageReports(bpos.getBlockPoolId()); if (LOG.isDebugEnabled()) { LOG.debug("Sending heartbeat with " + reports.length + " storage reports from service actor: " + this); } - VolumeFailureSummary volumeFailureSummary = dn.getFSDataset() - .getVolumeFailureSummary(); + VolumeFailureSummary volumeFailureSummary = + dataset.getVolumeFailureSummary(); int numFailedVolumes = volumeFailureSummary != null ? volumeFailureSummary.getFailedStorageLocations().length : 0; return bpNamenode.sendHeartbeat(bpRegistration, reports, - dn.getFSDataset().getCacheCapacity(), - dn.getFSDataset().getCacheUsed(), + dataset.getCacheCapacity(), + dataset.getCacheUsed(), dn.getXmitsInProgress(), dn.getXceiverCount(), numFailedVolumes, http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 2e11600..686541b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -99,6 +100,7 @@ class BlockReceiver implements Closeable { private ReplicaOutputStreams streams; private DatanodeInfo srcDataNode = null; private final DataNode datanode; + private final FsDatasetSpi<?> dataset; volatile private boolean mirrorError; // Cache management state @@ -141,8 +143,8 @@ class BlockReceiver implements Closeable { final BlockConstructionStage stage, final long newGs, final long minBytesRcvd, final long maxBytesRcvd, final String clientname, final DatanodeInfo srcDataNode, - final DataNode datanode, DataChecksum requestedChecksum, - CachingStrategy cachingStrategy, + final DataNode datanode, final FsDatasetSpi<?> dataset, + DataChecksum requestedChecksum, CachingStrategy cachingStrategy, final boolean allowLazyPersist, final boolean pinning) throws IOException { try{ @@ -152,6 +154,7 @@ class BlockReceiver implements Closeable { this.myAddr = myAddr; this.srcDataNode = srcDataNode; this.datanode = datanode; + this.dataset = dataset; this.clientname = clientname; this.isDatanode = clientname.length() == 0; @@ -183,27 +186,27 @@ class BlockReceiver implements Closeable { // Open local disk out // if (isDatanode) { //replication or move - replicaHandler = datanode.data.createTemporary(storageType, block); + replicaHandler = dataset.createTemporary(storageType, block); } else { switch (stage) { case PIPELINE_SETUP_CREATE: - replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist); + replicaHandler = dataset.createRbw(storageType, block, allowLazyPersist); datanode.notifyNamenodeReceivingBlock( block, replicaHandler.getReplica().getStorageUuid()); break; case PIPELINE_SETUP_STREAMING_RECOVERY: - replicaHandler = datanode.data.recoverRbw( + replicaHandler = dataset.recoverRbw( block, newGs, minBytesRcvd, maxBytesRcvd); block.setGenerationStamp(newGs); break; case PIPELINE_SETUP_APPEND: - replicaHandler = datanode.data.append(block, newGs, minBytesRcvd); + replicaHandler = dataset.append(block, newGs, minBytesRcvd); block.setGenerationStamp(newGs); datanode.notifyNamenodeReceivingBlock( block, replicaHandler.getReplica().getStorageUuid()); break; case PIPELINE_SETUP_APPEND_RECOVERY: - replicaHandler = datanode.data.recoverAppend(block, newGs, minBytesRcvd); + replicaHandler = dataset.recoverAppend(block, newGs, minBytesRcvd); block.setGenerationStamp(newGs); datanode.notifyNamenodeReceivingBlock( block, replicaHandler.getReplica().getStorageUuid()); @@ -212,7 +215,7 @@ class BlockReceiver implements Closeable { case TRANSFER_FINALIZED: // this is a transfer destination replicaHandler = - datanode.data.createTemporary(storageType, block); + dataset.createTemporary(storageType, block); break; default: throw new IOException("Unsupported stage " + stage + " while receiving block " + block + " from " + inAddr); @@ -717,7 +720,7 @@ class BlockReceiver implements Closeable { // if (syncBehindWrites) { if (syncBehindWritesInBackground) { - this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest( + dataset.submitBackgroundSyncFileRangeRequest( block, outFd, lastCacheManagementOffset, offsetInBlock - lastCacheManagementOffset, NativeIO.POSIX.SYNC_FILE_RANGE_WRITE); @@ -807,11 +810,11 @@ class BlockReceiver implements Closeable { if (stage == BlockConstructionStage.TRANSFER_RBW) { // for TRANSFER_RBW, convert temporary to RBW - datanode.data.convertTemporaryToRbw(block); + dataset.convertTemporaryToRbw(block); } else { // for isDatnode or TRANSFER_FINALIZED // Finalize the block. - datanode.data.finalizeBlock(block); + dataset.finalizeBlock(block); } } datanode.metrics.incrBlocksWritten(); @@ -904,7 +907,7 @@ class BlockReceiver implements Closeable { */ private void cleanupBlock() throws IOException { if (isDatanode) { - datanode.data.unfinalizeBlock(block); + dataset.unfinalizeBlock(block); } } @@ -921,7 +924,7 @@ class BlockReceiver implements Closeable { } // rollback the position of the meta file - datanode.data.adjustCrcChannelPosition(block, streams, checksumSize); + dataset.adjustCrcChannelPosition(block, streams, checksumSize); } /** @@ -959,7 +962,7 @@ class BlockReceiver implements Closeable { byte[] buf = new byte[sizePartialChunk]; byte[] crcbuf = new byte[checksumSize]; try (ReplicaInputStreams instr = - datanode.data.getTmpInputStreams(block, blkoff, ckoff)) { + dataset.getTmpInputStreams(block, blkoff, ckoff)) { IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk); // open meta file and read in crc value computer earlier @@ -1298,11 +1301,11 @@ class BlockReceiver implements Closeable { BlockReceiver.this.close(); endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; block.setNumBytes(replicaInfo.getNumBytes()); - datanode.data.finalizeBlock(block); + dataset.finalizeBlock(block); } if (pinning) { - datanode.data.setPinning(block); + dataset.setPinning(block); } datanode.closeBlock( http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 79f4dd7..5f4ea10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.util.DataTransferThrottler; @@ -147,6 +148,7 @@ class BlockSender implements java.io.Closeable { private final String clientTraceFmt; private volatile ChunkChecksum lastChunkChecksum = null; private DataNode datanode; + private final FsDatasetSpi<?> dataset; /** The file descriptor of the block being sent */ private FileDescriptor blockInFd; @@ -190,7 +192,8 @@ class BlockSender implements java.io.Closeable { */ BlockSender(ExtendedBlock block, long startOffset, long length, boolean corruptChecksumOk, boolean verifyChecksum, - boolean sendChecksum, DataNode datanode, String clientTraceFmt, + boolean sendChecksum, DataNode datanode, + final FsDatasetSpi<?> dataset, String clientTraceFmt, CachingStrategy cachingStrategy) throws IOException { try { @@ -227,6 +230,7 @@ class BlockSender implements java.io.Closeable { this.readaheadLength = cachingStrategy.getReadahead().longValue(); } this.datanode = datanode; + this.dataset = dataset; if (verifyChecksum) { // To simplify implementation, callers may not specify verification @@ -237,7 +241,7 @@ class BlockSender implements java.io.Closeable { final Replica replica; final long replicaVisibleLength; - synchronized(datanode.data) { + synchronized(dataset) { replica = getReplica(block, datanode); replicaVisibleLength = replica.getVisibleLength(); } @@ -274,7 +278,7 @@ class BlockSender implements java.io.Closeable { (!is32Bit || length <= Integer.MAX_VALUE); // Obtain a reference before reading data - this.volumeRef = datanode.data.getVolume(block).obtainReference(); + this.volumeRef = dataset.getVolume(block).obtainReference(); /* * (corruptChecksumOK, meta_file_exist): operation @@ -288,7 +292,7 @@ class BlockSender implements java.io.Closeable { LengthInputStream metaIn = null; boolean keepMetaInOpen = false; try { - metaIn = datanode.data.getMetaDataInputStream(block); + metaIn = dataset.getMetaDataInputStream(block); if (!corruptChecksumOk || metaIn != null) { if (metaIn == null) { //need checksum but meta-data not found @@ -387,7 +391,7 @@ class BlockSender implements java.io.Closeable { if (DataNode.LOG.isDebugEnabled()) { DataNode.LOG.debug("replica=" + replica); } - blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset + blockIn = dataset.getBlockInputStream(block, offset); // seek to offset if (blockIn instanceof FileInputStream) { blockInFd = ((FileInputStream)blockIn).getFD(); } else { @@ -451,8 +455,10 @@ class BlockSender implements java.io.Closeable { private static Replica getReplica(ExtendedBlock block, DataNode datanode) throws ReplicaNotFoundException { - Replica replica = datanode.data.getReplica(block.getBlockPoolId(), - block.getBlockId()); + final FsDatasetSpi<?> dataset = + datanode.getFSDataset(block.getBlockPoolId()); + Replica replica = + dataset.getReplica(block.getBlockPoolId(), block.getBlockId()); if (replica == null) { throw new ReplicaNotFoundException(block); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index d2b2939..70109a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -79,6 +79,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -285,9 +286,33 @@ public class DataNode extends ReconfigurableBase volatile boolean shutdownForUpgrade = false; private boolean shutdownInProgress = false; private BlockPoolManager blockPoolManager; - volatile FsDatasetSpi<? extends FsVolumeSpi> data = null; + + private final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> datasetFactory; + + // This is an onto (many-one) mapping. Multiple block pool IDs may share + // the same dataset. + private volatile Map<String, + FsDatasetSpi<? extends FsVolumeSpi>> datasetsMap = + new ConcurrentHashMap<>(); + + // Hash set of datasets, used to avoid having to deduplicate the values of datasetsMap + // every time we need to iterate over all datasets. + private volatile Set<FsDatasetSpi<? extends FsVolumeSpi>> datasets = + Collections.newSetFromMap( + new ConcurrentHashMap<FsDatasetSpi<? extends FsVolumeSpi>, + Boolean>()); + private String clusterId = null; + /** + * Do NOT reference this field outside of tests. + * It is retained to avoid breaking existing tests and subject to removal. + * In existing HDFS unit tests we are guaranteed not to have more than one + * dataset instance. + */ + @VisibleForTesting + volatile FsDatasetSpi<? extends FsVolumeSpi> data = null; + public final static String EMPTY_DEL_HINT = ""; final AtomicInteger xmitsInProgress = new AtomicInteger(); Daemon dataXceiverServer = null; @@ -319,7 +344,8 @@ public class DataNode extends ReconfigurableBase private boolean hasAnyBlockPoolRegistered = false; private final BlockScanner blockScanner; - private DirectoryScanner directoryScanner = null; + private Map<FsDatasetSpi<?>, DirectoryScanner> directoryScannersMap = + new ConcurrentHashMap<>(); /** Activated plug-ins. */ private List<ServicePlugin> plugins; @@ -373,6 +399,7 @@ public class DataNode extends ReconfigurableBase this.getHdfsBlockLocationsEnabled = false; this.blockScanner = new BlockScanner(this, conf); this.pipelineSupportECN = false; + this.datasetFactory = null; } /** @@ -387,7 +414,7 @@ public class DataNode extends ReconfigurableBase this.lastDiskErrorCheck = 0; this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT); - + datasetFactory = FsDatasetSpi.Factory.getFactory(conf); this.usersWithLocalPathAccess = Arrays.asList( conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY)); this.connectToDnViaHostname = conf.getBoolean( @@ -595,7 +622,9 @@ public class DataNode extends ReconfigurableBase @Override public IOException call() { try { - data.addVolume(location, nsInfos); + for (FsDatasetSpi<?> dataset : datasets) { + dataset.addVolume(location, nsInfos); + } } catch (IOException e) { return e; } @@ -698,7 +727,9 @@ public class DataNode extends ReconfigurableBase IOException ioe = null; // Remove volumes and block infos from FsDataset. - data.removeVolumes(absoluteVolumePaths, clearFailure); + for (final FsDatasetSpi<?> dataset : datasets) { + dataset.removeVolumes(absoluteVolumePaths, clearFailure); + } // Remove volumes from DataStorage. try { @@ -878,36 +909,42 @@ public class DataNode extends ReconfigurableBase } private void shutdownPeriodicScanners() { - shutdownDirectoryScanner(); + shutdownDirectoryScanners(); blockScanner.removeAllVolumeScanners(); } /** * See {@link DirectoryScanner} */ - private synchronized void initDirectoryScanner(Configuration conf) { - if (directoryScanner != null) { - return; - } - String reason = null; - if (conf.getInt(DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, - DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT) < 0) { - reason = "verification is turned off by configuration"; - } else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) { - reason = "verifcation is not supported by SimulatedFSDataset"; - } - if (reason == null) { - directoryScanner = new DirectoryScanner(this, data, conf); - directoryScanner.start(); - } else { - LOG.info("Periodic Directory Tree Verification scan is disabled because " + - reason); + private synchronized void initDirectoryScanners(Configuration conf) { + for (FsDatasetSpi<?> dataset : datasets) { + if (directoryScannersMap.get(dataset) != null) { + continue; + } + + String reason = null; + if (conf.getInt(DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, + DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT) < 0) { + reason = "verification is turned off by configuration"; + } else if ("SimulatedFSDataset".equals( + dataset.getClass().getSimpleName())) { + reason = "verifcation is not supported by SimulatedFSDataset"; + } + if (reason == null) { + DirectoryScanner scanner = new DirectoryScanner(this, dataset, conf); + directoryScannersMap.put(dataset, scanner); + scanner.start(); + } else { + LOG.info( + "Periodic Directory Tree Verification scan is disabled because " + + reason); + } } } - private synchronized void shutdownDirectoryScanner() { - if (directoryScanner != null) { - directoryScanner.shutdown(); + private synchronized void shutdownDirectoryScanners() { + for (DirectoryScanner scanner : directoryScannersMap.values()) { + scanner.shutdown(); } } @@ -1013,7 +1050,7 @@ public class DataNode extends ReconfigurableBase */ public void reportBadBlocks(ExtendedBlock block) throws IOException{ BPOfferService bpos = getBPOSForBlock(block); - FsVolumeSpi volume = getFSDataset().getVolume(block); + FsVolumeSpi volume = getFSDataset(block.getBlockPoolId()).getVolume(block); bpos.reportBadBlocks( block, volume.getStorageID(), volume.getStorageType()); } @@ -1328,8 +1365,9 @@ public class DataNode extends ReconfigurableBase blockScanner.disableBlockPoolId(bpId); - if (data != null) { - data.shutdownBlockPool(bpId); + FsDatasetSpi<?> dataset = getFSDataset(bpId); + if (dataset != null) { + dataset.shutdownBlockPool(bpId); } if (storage != null) { @@ -1350,7 +1388,7 @@ public class DataNode extends ReconfigurableBase * @param bpos Block pool offer service * @throws IOException if the NN is inconsistent with the local storage. */ - void initBlockPool(BPOfferService bpos) throws IOException { + FsDatasetSpi<?> initBlockPool(BPOfferService bpos) throws IOException { NamespaceInfo nsInfo = bpos.getNamespaceInfo(); if (nsInfo == null) { throw new IOException("NamespaceInfo not found: Block pool " + bpos @@ -1364,15 +1402,16 @@ public class DataNode extends ReconfigurableBase // In the case that this is the first block pool to connect, initialize // the dataset, block scanners, etc. - initStorage(nsInfo); + FsDatasetSpi<?> dataset = initStorage(bpos.getBlockPoolId(), nsInfo); // Exclude failed disks before initializing the block pools to avoid startup // failures. - checkDiskError(); + checkDiskError(getFSDataset(nsInfo.getBlockPoolID())); - initDirectoryScanner(conf); - data.addBlockPool(nsInfo.getBlockPoolID(), conf); + initDirectoryScanners(conf); + dataset.addBlockPool(nsInfo.getBlockPoolID(), conf); blockScanner.enableBlockPoolId(bpos.getBlockPoolId()); + return dataset; } List<BPOfferService> getAllBpOs() { @@ -1387,11 +1426,9 @@ public class DataNode extends ReconfigurableBase * Initializes the {@link #data}. The initialization is done only once, when * handshake with the the first namenode is completed. */ - private void initStorage(final NamespaceInfo nsInfo) throws IOException { - final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory - = FsDatasetSpi.Factory.getFactory(conf); - - if (!factory.isSimulated()) { + private FsDatasetSpi<?> initStorage( + final String blockPoolId, final NamespaceInfo nsInfo) throws IOException { + if (!datasetFactory.isSimulated()) { final StartupOption startOpt = getStartupOption(conf); if (startOpt == null) { throw new IOException("Startup option not set."); @@ -1409,12 +1446,7 @@ public class DataNode extends ReconfigurableBase // If this is a newly formatted DataNode then assign a new DatanodeUuid. checkDatanodeUuid(); - - synchronized(this) { - if (data == null) { - data = factory.newInstance(this, storage, conf); - } - } + return allocateFsDataset(blockPoolId, nsInfo.getNodeType()); } /** @@ -1556,8 +1588,9 @@ public class DataNode extends ReconfigurableBase Token<BlockTokenIdentifier> token) throws IOException { checkBlockLocalPathAccess(); checkBlockToken(block, token, BlockTokenIdentifier.AccessMode.READ); - Preconditions.checkNotNull(data, "Storage not yet initialized"); - BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); + FsDatasetSpi<?> dataset = getFSDataset(block.getBlockPoolId()); + Preconditions.checkNotNull(dataset, "Storage not yet initialized"); + BlockLocalPathInfo info = dataset.getBlockLocalPathInfo(block); if (LOG.isDebugEnabled()) { if (info != null) { if (LOG.isTraceEnabled()) { @@ -1612,8 +1645,10 @@ public class DataNode extends ReconfigurableBase FileInputStream fis[] = new FileInputStream[2]; try { - fis[0] = (FileInputStream)data.getBlockInputStream(blk, 0); - fis[1] = DatanodeUtil.getMetaDataInputStream(blk, data); + final FsDatasetSpi<?> dataset = getFSDataset(blk.getBlockPoolId()); + Preconditions.checkNotNull(dataset, "Storage not yet initialized"); + fis[0] = (FileInputStream) dataset.getBlockInputStream(blk, 0); + fis[1] = DatanodeUtil.getMetaDataInputStream(blk, dataset); } catch (ClassCastException e) { LOG.debug("requestShortCircuitFdsForRead failed", e); throw new ShortCircuitFdsUnsupportedException("This DataNode's " + @@ -1642,7 +1677,9 @@ public class DataNode extends ReconfigurableBase DataNodeFaultInjector.get().getHdfsBlocksMetadata(); - return data.getHdfsBlocksMetadata(bpId, blockIds); + final FsDatasetSpi<?> dataset = getFSDataset(bpId); + Preconditions.checkNotNull(dataset, "Storage not yet initialized"); + return dataset.getHdfsBlocksMetadata(bpId, blockIds); } private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> token, @@ -1804,8 +1841,8 @@ public class DataNode extends ReconfigurableBase LOG.warn("Exception when unlocking storage: " + ie, ie); } } - if (data != null) { - data.shutdown(); + for (FsDatasetSpi<?> dataset : datasets) { + dataset.shutdown(); } if (metrics != null) { metrics.shutdown(); @@ -1842,8 +1879,9 @@ public class DataNode extends ReconfigurableBase } } - private void handleDiskError(String errMsgr) { - final boolean hasEnoughResources = data.hasEnoughResource(); + private void handleDiskError(final FsDatasetSpi<?> dataset, + final String errMsgr) { + final boolean hasEnoughResources = dataset.hasEnoughResource(); LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources); // If we have enough active valid volumes then we do not want to @@ -1902,7 +1940,7 @@ public class DataNode extends ReconfigurableBase private void reportBadBlock(final BPOfferService bpos, final ExtendedBlock block, final String msg) { - FsVolumeSpi volume = getFSDataset().getVolume(block); + FsVolumeSpi volume = getFSDataset(block.getBlockPoolId()).getVolume(block); bpos.reportBadBlocks( block, volume.getStorageID(), volume.getStorageType()); LOG.warn(msg); @@ -1917,9 +1955,11 @@ public class DataNode extends ReconfigurableBase boolean replicaStateNotFinalized = false; boolean blockFileNotExist = false; boolean lengthTooShort = false; + final FsDatasetSpi<?> dataset = getFSDataset(block.getBlockPoolId()); + Preconditions.checkNotNull(dataset, "Storage not yet initialized"); try { - data.checkBlock(block, block.getNumBytes(), ReplicaState.FINALIZED); + dataset.checkBlock(block, block.getNumBytes(), ReplicaState.FINALIZED); } catch (ReplicaNotFoundException e) { replicaNotExist = true; } catch (UnexpectedReplicaStateException e) { @@ -1949,10 +1989,13 @@ public class DataNode extends ReconfigurableBase } if (lengthTooShort) { // Check if NN recorded length matches on-disk length - // Shorter on-disk len indicates corruption so report NN the corrupt block + // Shorter on-disk len indicates corruption so report NN + // the corrupt block reportBadBlock(bpos, block, "Can't replicate block " + block - + " because on-disk length " + data.getLength(block) - + " is shorter than NameNode recorded length " + block.getNumBytes()); + + " because on-disk length " + + getFSDataset(block.getBlockPoolId()).getLength(block) + + " is shorter than NameNode recorded length " + + block.getNumBytes()); return; } @@ -2159,7 +2202,8 @@ public class DataNode extends ReconfigurableBase DFSUtil.getSmallBufferSize(conf))); in = new DataInputStream(unbufIn); blockSender = new BlockSender(b, 0, b.getNumBytes(), - false, false, true, DataNode.this, null, cachingStrategy); + false, false, true, DataNode.this, getFSDataset(b.getBlockPoolId()), + null, cachingStrategy); DatanodeInfo srcNode = new DatanodeInfo(bpReg); new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, @@ -2447,8 +2491,10 @@ public class DataNode extends ReconfigurableBase @Override public String toString() { - return "DataNode{data=" + data + ", localName='" + getDisplayName() - + "', datanodeUuid='" + storage.getDatanodeUuid() + "', xmitsInProgress=" + return "DataNode{datasets=" + datasets.toString() + + ", localName='" + getDisplayName() + + "', datanodeUuid='" + storage.getDatanodeUuid() + + "', xmitsInProgress=" + xmitsInProgress.get() + "}"; } @@ -2506,14 +2552,61 @@ public class DataNode extends ReconfigurableBase } /** + * Allocate a new dataset for the given serviceType. This may return a + * previously allocated dataset. + * + * @param bpid + * @param serviceType + * @return + * @throws IOException + */ + private FsDatasetSpi<?> allocateFsDataset( + final String bpid, final NodeType serviceType) throws IOException { + FsDatasetSpi<?> dataset = + datasetFactory.newInstance(this, storage, conf, serviceType); + datasets.add(dataset); + datasetsMap.put(bpid, dataset); + + if (serviceType == NodeType.NAME_NODE) { + // 'data' is retained for existing mock-based HDFS unit tests. + Preconditions.checkState(data == null || data == dataset); + data = dataset; + } + + return dataset; + } + + /** * Examples are adding and deleting blocks directly. * The most common usage will be when the data node's storage is simulated. * * @return the fsdataset that stores the blocks */ @VisibleForTesting + public FsDatasetSpi<?> getFSDataset(final String bpid) { + return datasetsMap.get(bpid); + } + + @VisibleForTesting + public Set<FsDatasetSpi<?>> getFSDatasets() { + return datasets; + } + + /** + * Do NOT use this method outside of tests. + * Retained for compatibility with existing tests and subject to removal. + * + * @return the fsdataset that stores the blocks + */ + @VisibleForTesting public FsDatasetSpi<?> getFSDataset() { - return data; + Preconditions.checkState(datasets.size() <= 1, + "Did not expect more than one Dataset here."); + + if (datasets.size() == 0) { + return null; + } + return (FsDatasetSpi<?>) datasets.iterator().next(); } @VisibleForTesting @@ -2522,9 +2615,15 @@ public class DataNode extends ReconfigurableBase return blockScanner; } + /** + * Do NOT use this method outside of tests. + * Retained for compatibility with existing tests and subject to removal. + * + * @return + */ @VisibleForTesting DirectoryScanner getDirectoryScanner() { - return directoryScanner; + return directoryScannersMap.get(getFSDataset()); } public static void secureMain(String args[], SecureResources resources) { @@ -2584,7 +2683,12 @@ public class DataNode extends ReconfigurableBase @Override // InterDatanodeProtocol public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) throws IOException { - return data.initReplicaRecovery(rBlock); + final FsDatasetSpi<?> dataset = + getFSDataset(rBlock.getBlock().getBlockPoolId()); + if (dataset != null) { + return dataset.initReplicaRecovery(rBlock); + } + return null; } /** @@ -2608,8 +2712,10 @@ public class DataNode extends ReconfigurableBase public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock, final long recoveryId, final long newBlockId, final long newLength) throws IOException { - final String storageID = data.updateReplicaUnderRecovery(oldBlock, - recoveryId, newBlockId, newLength); + final String storageID = + getFSDataset(oldBlock.getBlockPoolId()) + .updateReplicaUnderRecovery(oldBlock, recoveryId, + newBlockId, newLength); // Notify the namenode of the updated block info. This is important // for HA, since otherwise the standby node may lose track of the // block locations until the next block report. @@ -2849,7 +2955,7 @@ public class DataNode extends ReconfigurableBase @Override // ClientDataNodeProtocol public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { checkReadAccess(block); - return data.getReplicaVisibleLength(block); + return getFSDataset(block.getBlockPoolId()).getReplicaVisibleLength(block); } private void checkReadAccess(final ExtendedBlock block) throws IOException { @@ -2886,10 +2992,11 @@ public class DataNode extends ReconfigurableBase final long storedGS; final long visible; final BlockConstructionStage stage; + final FsDatasetSpi<?> dataset = getFSDataset(b.getBlockPoolId()); //get replica information - synchronized(data) { - Block storedBlock = data.getStoredBlock(b.getBlockPoolId(), + synchronized(dataset) { + Block storedBlock = dataset.getStoredBlock(b.getBlockPoolId(), b.getBlockId()); if (null == storedBlock) { throw new IOException(b + " not found in datanode."); @@ -2901,15 +3008,16 @@ public class DataNode extends ReconfigurableBase } // Update the genstamp with storedGS b.setGenerationStamp(storedGS); - if (data.isValidRbw(b)) { + if (dataset.isValidRbw(b)) { stage = BlockConstructionStage.TRANSFER_RBW; - } else if (data.isValidBlock(b)) { + } else if (dataset.isValidBlock(b)) { stage = BlockConstructionStage.TRANSFER_FINALIZED; } else { - final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId()); + final String r = dataset.getReplicaString( + b.getBlockPoolId(), b.getBlockId()); throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r); } - visible = data.getReplicaVisibleLength(b); + visible = dataset.getReplicaVisibleLength(b); } //set visible length b.setNumBytes(visible); @@ -2990,6 +3098,7 @@ public class DataNode extends ReconfigurableBase */ @Override // DataNodeMXBean public String getVolumeInfo() { + // Default implementation for backwards compatibility. Preconditions.checkNotNull(data, "Storage not yet initialized"); return JSON.toString(data.getVolumeInfoMap()); } @@ -3024,7 +3133,7 @@ public class DataNode extends ReconfigurableBase "shutdown the block pool service"); } - data.deleteBlockPool(blockPoolId, force); + getFSDataset(blockPoolId).deleteBlockPool(blockPoolId, force); } @Override // ClientDatanodeProtocol @@ -3179,8 +3288,8 @@ public class DataNode extends ReconfigurableBase /** * Check the disk error */ - private void checkDiskError() { - Set<File> unhealthyDataDirs = data.checkDataDir(); + private void checkDiskError(final FsDatasetSpi<?> dataset) { + Set<File> unhealthyDataDirs = dataset.checkDataDir(); if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) { try { // Remove all unhealthy volumes from DataNode. @@ -3193,7 +3302,7 @@ public class DataNode extends ReconfigurableBase for (File dataDir : unhealthyDataDirs) { sb.append(dataDir.getAbsolutePath() + ";"); } - handleDiskError(sb.toString()); + handleDiskError(dataset, sb.toString()); } } @@ -3213,7 +3322,9 @@ public class DataNode extends ReconfigurableBase } if(tempFlag) { try { - checkDiskError(); + for (final FsDatasetSpi<?> dataset : datasets) { + checkDiskError(dataset); + } } catch (Exception e) { LOG.warn("Unexpected exception occurred while checking disk error " + e); checkDiskErrorThread = null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 26d669c..fbb8897 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; @@ -312,8 +313,9 @@ class DataXceiver extends Receiver implements Runnable { "anything but a UNIX domain socket."); } if (slotId != null) { - boolean isCached = datanode.data. - isCached(blk.getBlockPoolId(), blk.getBlockId()); + final String bpid = blk.getBlockPoolId(); + boolean isCached = datanode.getFSDataset(bpid). + isCached(bpid, blk.getBlockId()); datanode.shortCircuitRegistry.registerSlot( ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached); registeredSlotId = slotId; @@ -523,7 +525,14 @@ class DataXceiver extends Receiver implements Runnable { baseStream, smallBufferSize)); checkAccess(out, true, block, blockToken, Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ); - + + final FsDatasetSpi<?> dataset = + datanode.getFSDataset(block.getBlockPoolId()); + if (dataset == null) { + throw new IOException( + "Unknown or unitialized blockpool " + block.getBlockPoolId()); + } + // send the block BlockSender blockSender = null; DatanodeRegistration dnR = @@ -540,7 +549,7 @@ class DataXceiver extends Receiver implements Runnable { try { try { blockSender = new BlockSender(block, blockOffset, length, - true, false, sendChecksum, datanode, clientTraceFmt, + true, false, sendChecksum, datanode, dataset, clientTraceFmt, cachingStrategy); } catch(IOException e) { String msg = "opReadBlock " + block + " received exception " + e; @@ -630,6 +639,13 @@ class DataXceiver extends Receiver implements Runnable { final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW || stage == BlockConstructionStage.TRANSFER_FINALIZED; long size = 0; + final FsDatasetSpi<?> dataset = + datanode.getFSDataset(block.getBlockPoolId()); + if (dataset == null) { + throw new IOException( + "Unknown or unitialized blockpool " + block.getBlockPoolId()); + } + // check single target for transfer-RBW/Finalized if (isTransfer && targets.length > 0) { throw new IOException(stage + " does not support multiple targets " @@ -683,12 +699,12 @@ class DataXceiver extends Receiver implements Runnable { peer.getRemoteAddressString(), peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, - clientname, srcDataNode, datanode, requestedChecksum, + clientname, srcDataNode, datanode, dataset, requestedChecksum, cachingStrategy, allowLazyPersist, pinning); storageUuid = blockReceiver.getStorageUuid(); } else { - storageUuid = datanode.data.recoverClose( + storageUuid = dataset.recoverClose( block, latestGenerationStamp, minBytesRcvd); } @@ -890,6 +906,12 @@ class DataXceiver extends Receiver implements Runnable { final int csize = checksum.getChecksumSize(); final byte[] buffer = new byte[4*1024]; MessageDigest digester = MD5Hash.getDigester(); + final FsDatasetSpi<?> dataset = + datanode.getFSDataset(block.getBlockPoolId()); + if (dataset == null) { + throw new IOException( + "Unknown or unitialized blockpool " + block.getBlockPoolId()); + } long remaining = requestLength / bytesPerCRC * csize; for (int toDigest = 0; remaining > 0; remaining -= toDigest) { @@ -904,7 +926,7 @@ class DataXceiver extends Receiver implements Runnable { int partialLength = (int) (requestLength % bytesPerCRC); if (partialLength > 0) { byte[] buf = new byte[partialLength]; - final InputStream blockIn = datanode.data.getBlockInputStream(block, + final InputStream blockIn = dataset.getBlockInputStream(block, requestLength - partialLength); try { // Get the CRC of the partialLength. @@ -928,14 +950,20 @@ class DataXceiver extends Receiver implements Runnable { checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ); // client side now can specify a range of the block for checksum + final FsDatasetSpi<?> dataset = + datanode.getFSDataset(block.getBlockPoolId()); + if (dataset == null) { + throw new IOException( + "Unknown or unitialized blockpool " + block.getBlockPoolId()); + } + long requestLength = block.getNumBytes(); Preconditions.checkArgument(requestLength >= 0); - long visibleLength = datanode.data.getReplicaVisibleLength(block); + long visibleLength = dataset.getReplicaVisibleLength(block); boolean partialBlk = requestLength < visibleLength; updateCurrentThreadName("Reading metadata for block " + block); - final LengthInputStream metadataIn = datanode.data - .getMetaDataInputStream(block); + final LengthInputStream metadataIn = dataset.getMetaDataInputStream(block); final DataInputStream checksumIn = new DataInputStream( new BufferedInputStream(metadataIn, ioFileBufferSize)); @@ -986,6 +1014,13 @@ class DataXceiver extends Receiver implements Runnable { @Override public void copyBlock(final ExtendedBlock block, final Token<BlockTokenIdentifier> blockToken) throws IOException { + final FsDatasetSpi<?> dataset = + datanode.getFSDataset(block.getBlockPoolId()); + if (dataset == null) { + throw new IOException( + "Unknown or unitialized blockpool " + block.getBlockPoolId()); + } + updateCurrentThreadName("Copying block " + block); // Read in the header if (datanode.isBlockTokenEnabled) { @@ -1002,7 +1037,7 @@ class DataXceiver extends Receiver implements Runnable { } - if (datanode.data.getPinning(block)) { + if (dataset.getPinning(block)) { String msg = "Not able to copy block " + block.getBlockId() + " " + "to " + peer.getRemoteAddressString() + " because it's pinned "; LOG.info(msg); @@ -1025,7 +1060,7 @@ class DataXceiver extends Receiver implements Runnable { try { // check if the block exists or not blockSender = new BlockSender(block, 0, -1, false, false, true, datanode, - null, CachingStrategy.newDropBehind()); + dataset, null, CachingStrategy.newDropBehind()); // set up response stream OutputStream baseStream = getOutputStream(); @@ -1073,6 +1108,13 @@ class DataXceiver extends Receiver implements Runnable { final Token<BlockTokenIdentifier> blockToken, final String delHint, final DatanodeInfo proxySource) throws IOException { + final FsDatasetSpi<?> dataset = + datanode.getFSDataset(block.getBlockPoolId()); + if (dataset == null) { + throw new IOException( + "Unknown or unitialized blockpool " + block.getBlockPoolId()); + } + updateCurrentThreadName("Replacing block " + block + " from " + delHint); /* read header */ @@ -1109,7 +1151,7 @@ class DataXceiver extends Receiver implements Runnable { try { // Move the block to different storage in the same datanode if (proxySource.equals(datanode.getDatanodeId())) { - ReplicaInfo oldReplica = datanode.data.moveBlockAcrossStorage(block, + ReplicaInfo oldReplica = dataset.moveBlockAcrossStorage(block, storageType); if (oldReplica != null) { LOG.info("Moved " + block + " from StorageType " @@ -1164,7 +1206,7 @@ class DataXceiver extends Receiver implements Runnable { blockReceiver = new BlockReceiver(block, storageType, proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), - null, 0, 0, 0, "", null, datanode, remoteChecksum, + null, 0, 0, 0, "", null, datanode, dataset, remoteChecksum, CachingStrategy.newDropBehind(), false, false); // receive a block http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java index 615abe9..2cc3516 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java @@ -433,8 +433,9 @@ public class VolumeScanner extends Thread { BlockSender blockSender = null; try { blockSender = new BlockSender(block, 0, -1, - false, true, true, datanode, null, - CachingStrategy.newDropBehind()); + false, true, true, datanode, + datanode.getFSDataset(block.getBlockPoolId()), + null, CachingStrategy.newDropBehind()); throttler.setBandwidth(bytesPerSec); long bytesRead = blockSender.sendBlock(nullStream, null, throttler); resultHandler.handle(block, null); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 76c4f02..a4672b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -41,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; @@ -68,6 +70,7 @@ import org.apache.hadoop.util.ReflectionUtils; * The default implementation stores replicas on local drives. */ @InterfaceAudience.Private [email protected] public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { /** * A factory for creating {@link FsDatasetSpi} objects. @@ -83,9 +86,10 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { return ReflectionUtils.newInstance(clazz, conf); } - /** Create a new object. */ - public abstract D newInstance(DataNode datanode, DataStorage storage, - Configuration conf) throws IOException; + /** Create a new dataset object for a specific service type. */ + public abstract D newInstance(DataNode datanode, + DataStorage storage, Configuration conf, + NodeType serviceType) throws IOException; /** Does the factory create simulated objects? */ public boolean isSimulated() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index 8d1bb2a..7c7b9a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -22,12 +22,16 @@ import java.io.File; import java.io.IOException; import java.nio.channels.ClosedChannelException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; /** * This is an interface for the underlying volume. */ [email protected] [email protected] public interface FsVolumeSpi { /** * Obtain a reference object that had increased 1 reference count of the http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java index 52e385b..01c3830 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java @@ -18,8 +18,11 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -28,9 +31,25 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; * A factory for creating {@link FsDatasetImpl} objects. */ public class FsDatasetFactory extends FsDatasetSpi.Factory<FsDatasetImpl> { + + private final Map<NodeType, FsDatasetImpl> datasetMap = new HashMap<>(); + @Override - public FsDatasetImpl newInstance(DataNode datanode, - DataStorage storage, Configuration conf) throws IOException { - return new FsDatasetImpl(datanode, storage, conf); + public synchronized FsDatasetImpl newInstance(DataNode datanode, + DataStorage storage, Configuration conf, + NodeType serviceType) throws IOException { + FsDatasetImpl dataset = datasetMap.get(serviceType); + if (dataset != null) { + return dataset; + } + switch (serviceType) { + case NAME_NODE: + dataset = new FsDatasetImpl(datanode, storage, conf); + break; + default: + throw new IllegalArgumentException("Unsupported node type " + serviceType); + } + datasetMap.put(serviceType, dataset); + return dataset; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 8ebd214..999b827 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -308,7 +308,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(), blockChooserImpl); asyncDiskService = new FsDatasetAsyncDiskService(datanode, this); - asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode); + asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, this); deletingBlock = new HashMap<String, Set<Long>>(); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { @@ -347,9 +347,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { /** * Gets initial volume failure information for all volumes that failed - * immediately at startup. The method works by determining the set difference - * between all configured storage locations and the actual storage locations in - * use after attempting to put all of them into service. + * immediately at startup. The method works by determining the set + * difference between all configured storage locations and the actual + * storage locations in use after attempting to put all of them into + * service. * * @return each storage location that has failed */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java index 884df2e..effbd4b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java @@ -53,6 +53,7 @@ class RamDiskAsyncLazyPersistService { private static final long THREADS_KEEP_ALIVE_SECONDS = 60; private final DataNode datanode; + private final FsDatasetImpl dataset; private final ThreadGroup threadGroup; private Map<File, ThreadPoolExecutor> executors = new HashMap<File, ThreadPoolExecutor>(); @@ -65,8 +66,10 @@ class RamDiskAsyncLazyPersistService { * The RamDiskAsyncLazyPersistService uses one ThreadPool per volume to do the async * disk operations. */ - RamDiskAsyncLazyPersistService(DataNode datanode) { + RamDiskAsyncLazyPersistService(DataNode datanode, + final FsDatasetImpl dataset) { this.datanode = datanode; + this.dataset = dataset; this.threadGroup = new ThreadGroup(getClass().getSimpleName()); } @@ -234,7 +237,6 @@ class RamDiskAsyncLazyPersistService { @Override public void run() { boolean succeeded = false; - final FsDatasetImpl dataset = (FsDatasetImpl)datanode.getFSDataset(); try (FsVolumeReference ref = this.targetVolume) { int smallBufferSize = DFSUtil.getSmallBufferSize(EMPTY_HDFS_CONF); // No FsDatasetImpl lock for the file copy http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java index 5c1b38f..d925b93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.*; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.test.GenericTestUtils; @@ -73,7 +74,8 @@ public class TestWriteBlockGetsBlockLengthHint { static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> { @Override public SimulatedFSDataset newInstance(DataNode datanode, - DataStorage storage, Configuration conf) throws IOException { + DataStorage storage, Configuration conf, + HdfsServerConstants.NodeType serviceType) throws IOException { return new FsDatasetChecker(storage, conf); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 778dd28..d957767 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; @@ -83,7 +84,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> { @Override public SimulatedFSDataset newInstance(DataNode datanode, - DataStorage storage, Configuration conf) throws IOException { + DataStorage storage, Configuration conf, + HdfsServerConstants.NodeType serviceType) throws IOException { return new SimulatedFSDataset(datanode, storage, conf); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 64cc78b..2059f1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyString; import java.io.File; import java.io.IOException; @@ -118,7 +119,8 @@ public class TestBPOfferService { mockFSDataset.addBlockPool(FAKE_BPID, conf); // Wire the dataset to the DN. - Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(); + Mockito.doReturn(mockFSDataset).when(mockDn).initBlockPool(Mockito.any(BPOfferService.class)); + Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(anyString()); } /** @@ -325,15 +327,16 @@ public class TestBPOfferService { Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")). when(mockDn).getMetrics(); final AtomicInteger count = new AtomicInteger(); - Mockito.doAnswer(new Answer<Void>() { + Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(anyString()); + Mockito.doAnswer(new Answer<FsDatasetSpi<?>>() { @Override - public Void answer(InvocationOnMock invocation) throws Throwable { + public FsDatasetSpi<?> answer(InvocationOnMock invocation) throws Throwable { if (count.getAndIncrement() == 0) { throw new IOException("faked initBlockPool exception"); } // The initBlockPool is called again. Now mock init is done. - Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(); - return null; + Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(anyString()); + return mockFSDataset; } }).when(mockDn).initBlockPool(Mockito.any(BPOfferService.class)); BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2); @@ -563,10 +566,10 @@ public class TestBPOfferService { assertSame(mockNN1, bpos.getActiveNN()); Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0)) .when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class), - Mockito.anyInt(), Mockito.anyString()); + Mockito.anyInt(), anyString()); Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1)) .when(mockNN2).errorReport(Mockito.any(DatanodeRegistration.class), - Mockito.anyInt(), Mockito.anyString()); + Mockito.anyInt(), anyString()); String errorString = "Can't send invalid block " + FAKE_BLOCK; bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString); bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString); @@ -614,7 +617,7 @@ public class TestBPOfferService { } } }).when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class), - Mockito.anyInt(), Mockito.anyString()); + Mockito.anyInt(), anyString()); String errorString = "Can't send invalid block " + FAKE_BLOCK; bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString); Thread.sleep(10000); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java index 07a26cc..ad4135b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.junit.Test; @@ -45,7 +46,7 @@ public class TestDataNodeInitStorage { @Override public SimulatedFsDatasetVerifier newInstance( DataNode datanode, DataStorage storage, - Configuration conf) throws IOException { + Configuration conf, HdfsServerConstants.NodeType serviceType) throws IOException { return new SimulatedFsDatasetVerifier(storage, conf); }
