HDFS-11190. [READ] Namenode support for data stored in external stores.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1b339163 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1b339163 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1b339163 Branch: refs/heads/HDFS-9806 Commit: 1b3391634db94abeeacdb382a016085543f8ebfe Parents: 56a5f37 Author: Virajith Jalaparti <[email protected]> Authored: Fri Apr 21 11:12:36 2017 -0700 Committer: Virajith Jalaparti <[email protected]> Committed: Tue Oct 24 11:10:22 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hdfs/protocol/LocatedBlock.java | 96 ++++- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 + .../blockmanagement/BlockFormatProvider.java | 91 ++++ .../server/blockmanagement/BlockManager.java | 95 +++-- .../server/blockmanagement/BlockProvider.java | 65 +++ .../BlockStoragePolicySuite.java | 6 + .../blockmanagement/DatanodeDescriptor.java | 34 +- .../server/blockmanagement/DatanodeManager.java | 2 + .../blockmanagement/DatanodeStorageInfo.java | 4 + .../blockmanagement/LocatedBlockBuilder.java | 109 +++++ .../blockmanagement/ProvidedStorageMap.java | 427 +++++++++++++++++++ .../src/main/resources/hdfs-default.xml | 30 +- .../hadoop/hdfs/TestBlockStoragePolicy.java | 4 + .../blockmanagement/TestDatanodeManager.java | 65 ++- .../TestNameNodeProvidedImplementation.java | 345 +++++++++++++++ 15 files changed, 1292 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b339163/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java index 85bec92..5ad0bca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.protocol; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import com.google.common.base.Preconditions; @@ -62,40 +63,50 @@ public class LocatedBlock { public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) { // By default, startOffset is unknown(-1) and corrupt is false. - this(b, locs, null, null, -1, false, EMPTY_LOCS); + this(b, convert(locs, null, null), null, null, -1, false, EMPTY_LOCS); } public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs, StorageType[] storageTypes) { - this(b, locs, storageIDs, storageTypes, -1, false, EMPTY_LOCS); + this(b, convert(locs, storageIDs, storageTypes), + storageIDs, storageTypes, -1, false, EMPTY_LOCS); } - public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs, - StorageType[] storageTypes, long startOffset, + public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, + String[] storageIDs, StorageType[] storageTypes, long startOffset, + boolean corrupt, DatanodeInfo[] cachedLocs) { + this(b, convert(locs, storageIDs, storageTypes), + storageIDs, storageTypes, startOffset, corrupt, + null == cachedLocs || 0 == cachedLocs.length ? EMPTY_LOCS : cachedLocs); + } + + public LocatedBlock(ExtendedBlock b, DatanodeInfoWithStorage[] locs, + String[] storageIDs, StorageType[] storageTypes, long startOffset, boolean corrupt, DatanodeInfo[] cachedLocs) { this.b = b; this.offset = startOffset; this.corrupt = corrupt; - if (locs==null) { - this.locs = EMPTY_LOCS; - } else { - this.locs = new DatanodeInfoWithStorage[locs.length]; - for(int i = 0; i < locs.length; i++) { - DatanodeInfo di = locs[i]; - DatanodeInfoWithStorage storage = new DatanodeInfoWithStorage(di, - storageIDs != null ? storageIDs[i] : null, - storageTypes != null ? storageTypes[i] : null); - this.locs[i] = storage; - } - } + this.locs = null == locs ? EMPTY_LOCS : locs; this.storageIDs = storageIDs; this.storageTypes = storageTypes; + this.cachedLocs = null == cachedLocs || 0 == cachedLocs.length + ? EMPTY_LOCS + : cachedLocs; + } + + private static DatanodeInfoWithStorage[] convert( + DatanodeInfo[] infos, String[] storageIDs, StorageType[] storageTypes) { + if (null == infos) { + return EMPTY_LOCS; + } - if (cachedLocs == null || cachedLocs.length == 0) { - this.cachedLocs = EMPTY_LOCS; - } else { - this.cachedLocs = cachedLocs; + DatanodeInfoWithStorage[] ret = new DatanodeInfoWithStorage[infos.length]; + for(int i = 0; i < infos.length; i++) { + ret[i] = new DatanodeInfoWithStorage(infos[i], + storageIDs != null ? storageIDs[i] : null, + storageTypes != null ? storageTypes[i] : null); } + return ret; } public Token<BlockTokenIdentifier> getBlockToken() { @@ -145,6 +156,51 @@ public class LocatedBlock { } } + /** + * Comparator that ensures that a PROVIDED storage type is greater than + * any other storage type. Any other storage types are considered equal. + */ + private class ProvidedLastComparator + implements Comparator<DatanodeInfoWithStorage> { + @Override + public int compare(DatanodeInfoWithStorage dns1, + DatanodeInfoWithStorage dns2) { + if (StorageType.PROVIDED.equals(dns1.getStorageType()) + && !StorageType.PROVIDED.equals(dns2.getStorageType())) { + return 1; + } + if (!StorageType.PROVIDED.equals(dns1.getStorageType()) + && StorageType.PROVIDED.equals(dns2.getStorageType())) { + return -1; + } + // Storage types of dns1 and dns2 are now both provided or not provided; + // thus, are essentially equal for the purpose of this comparator. + return 0; + } + } + + /** + * Moves all locations that have {@link StorageType} + * {@code PROVIDED} to the end of the locations array without + * changing the relative ordering of the remaining locations + * Only the first {@code activeLen} locations are considered. + * The caller must immediately invoke {@link + * org.apache.hadoop.hdfs.protocol.LocatedBlock#updateCachedStorageInfo} + * to update the cached Storage ID/Type arrays. + * @param activeLen + */ + public void moveProvidedToEnd(int activeLen) { + + if (activeLen <= 0) { + return; + } + // as this is a stable sort, for elements that are equal, + // the current order of the elements is maintained + Arrays.sort(locs, 0, + (activeLen < locs.length) ? activeLen : locs.length, + new ProvidedLastComparator()); + } + public long getStartOffset() { return offset; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b339163/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 1b50081..6458112 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -328,6 +328,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.namenode.edits.asynclogging"; public static final boolean DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT = true; + public static final String DFS_NAMENODE_PROVIDED_ENABLED = "dfs.namenode.provided.enabled"; + public static final boolean DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT = false; + + public static final String DFS_NAMENODE_BLOCK_PROVIDER_CLASS = "dfs.namenode.block.provider.class"; + public static final String DFS_PROVIDER_CLASS = "dfs.provider.class"; public static final String DFS_PROVIDER_DF_CLASS = "dfs.provided.df.class"; public static final String DFS_PROVIDER_STORAGEUUID = "dfs.provided.storage.id"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b339163/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java new file mode 100644 index 0000000..930263d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.BlockAlias; +import org.apache.hadoop.hdfs.server.common.BlockFormat; +import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Loads provided blocks from a {@link BlockFormat}. + */ +public class BlockFormatProvider extends BlockProvider + implements Configurable { + + private Configuration conf; + private BlockFormat<? extends BlockAlias> blockFormat; + public static final Logger LOG = + LoggerFactory.getLogger(BlockFormatProvider.class); + + @Override + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void setConf(Configuration conf) { + Class<? extends BlockFormat> c = conf.getClass( + DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS, + TextFileRegionFormat.class, BlockFormat.class); + blockFormat = ReflectionUtils.newInstance(c, conf); + LOG.info("Loaded BlockFormat class : " + c.getClass().getName()); + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public Iterator<Block> iterator() { + try { + final BlockFormat.Reader<? extends BlockAlias> reader = + blockFormat.getReader(null); + + return new Iterator<Block>() { + + private final Iterator<? extends BlockAlias> inner = reader.iterator(); + + @Override + public boolean hasNext() { + return inner.hasNext(); + } + + @Override + public Block next() { + return inner.next().getBlock(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } catch (IOException e) { + throw new RuntimeException("Failed to read provided blocks", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b339163/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 6cd67f6..1b22964 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -430,6 +430,9 @@ public class BlockManager implements BlockStatsMXBean { */ private final short minReplicationToBeInMaintenance; + /** Storages accessible from multiple DNs. */ + private final ProvidedStorageMap providedStorageMap; + public BlockManager(final Namesystem namesystem, boolean haEnabled, final Configuration conf) throws IOException { this.namesystem = namesystem; @@ -462,6 +465,8 @@ public class BlockManager implements BlockStatsMXBean { blockTokenSecretManager = createBlockTokenSecretManager(conf); + providedStorageMap = new ProvidedStorageMap(namesystem, this, conf); + this.maxCorruptFilesReturned = conf.getInt( DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY, DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED); @@ -1132,7 +1137,7 @@ public class BlockManager implements BlockStatsMXBean { final long fileLength = bc.computeContentSummary( getStoragePolicySuite()).getLength(); final long pos = fileLength - lastBlock.getNumBytes(); - return createLocatedBlock(lastBlock, pos, + return createLocatedBlock(null, lastBlock, pos, BlockTokenIdentifier.AccessMode.WRITE); } @@ -1153,8 +1158,10 @@ public class BlockManager implements BlockStatsMXBean { return locations; } - private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks, - final long offset, final long length, final int nrBlocksToReturn, + private void createLocatedBlockList( + LocatedBlockBuilder locatedBlocks, + final BlockInfo[] blocks, + final long offset, final long length, final AccessMode mode) throws IOException { int curBlk; long curPos = 0, blkSize = 0; @@ -1169,21 +1176,22 @@ public class BlockManager implements BlockStatsMXBean { } if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file - return Collections.emptyList(); + return; long endOff = offset + length; - List<LocatedBlock> results = new ArrayList<>(blocks.length); do { - results.add(createLocatedBlock(blocks[curBlk], curPos, mode)); + locatedBlocks.addBlock( + createLocatedBlock(locatedBlocks, blocks[curBlk], curPos, mode)); curPos += blocks[curBlk].getNumBytes(); curBlk++; } while (curPos < endOff && curBlk < blocks.length - && results.size() < nrBlocksToReturn); - return results; + && !locatedBlocks.isBlockMax()); + return; } - private LocatedBlock createLocatedBlock(final BlockInfo[] blocks, + private LocatedBlock createLocatedBlock(LocatedBlockBuilder locatedBlocks, + final BlockInfo[] blocks, final long endPos, final AccessMode mode) throws IOException { int curBlk; long curPos = 0; @@ -1196,12 +1204,13 @@ public class BlockManager implements BlockStatsMXBean { curPos += blkSize; } - return createLocatedBlock(blocks[curBlk], curPos, mode); + return createLocatedBlock(locatedBlocks, blocks[curBlk], curPos, mode); } - private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos, - final AccessMode mode) throws IOException { - final LocatedBlock lb = createLocatedBlock(blk, pos); + private LocatedBlock createLocatedBlock(LocatedBlockBuilder locatedBlocks, + final BlockInfo blk, final long pos, final AccessMode mode) + throws IOException { + final LocatedBlock lb = createLocatedBlock(locatedBlocks, blk, pos); if (mode != null) { setBlockToken(lb, mode); } @@ -1209,21 +1218,24 @@ public class BlockManager implements BlockStatsMXBean { } /** @return a LocatedBlock for the given block */ - private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos) - throws IOException { + private LocatedBlock createLocatedBlock(LocatedBlockBuilder locatedBlocks, + final BlockInfo blk, final long pos) throws IOException { if (!blk.isComplete()) { final BlockUnderConstructionFeature uc = blk.getUnderConstructionFeature(); if (blk.isStriped()) { final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(), blk); + //TODO use locatedBlocks builder?? return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos, false); } else { final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(), blk); - return newLocatedBlock(eb, storages, pos, false); + return null == locatedBlocks + ? newLocatedBlock(eb, storages, pos, false) + : locatedBlocks.newLocatedBlock(eb, storages, pos, false); } } @@ -1287,9 +1299,10 @@ public class BlockManager implements BlockStatsMXBean { " numCorrupt: " + numCorruptNodes + " numCorruptRepls: " + numCorruptReplicas; final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(), blk); - return blockIndices == null ? - newLocatedBlock(eb, machines, pos, isCorrupt) : - newLocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt); + return blockIndices == null + ? null == locatedBlocks ? newLocatedBlock(eb, machines, pos, isCorrupt) + : locatedBlocks.newLocatedBlock(eb, machines, pos, isCorrupt) + : newLocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt); } /** Create a LocatedBlocks. */ @@ -1311,27 +1324,31 @@ public class BlockManager implements BlockStatsMXBean { LOG.debug("blocks = {}", java.util.Arrays.asList(blocks)); } final AccessMode mode = needBlockToken? BlockTokenIdentifier.AccessMode.READ: null; - final List<LocatedBlock> locatedblocks = createLocatedBlockList( - blocks, offset, length, Integer.MAX_VALUE, mode); - final LocatedBlock lastlb; - final boolean isComplete; + LocatedBlockBuilder locatedBlocks = providedStorageMap + .newLocatedBlocks(Integer.MAX_VALUE) + .fileLength(fileSizeExcludeBlocksUnderConstruction) + .lastUC(isFileUnderConstruction) + .encryption(feInfo) + .erasureCoding(ecPolicy); + + createLocatedBlockList(locatedBlocks, blocks, offset, length, mode); if (!inSnapshot) { final BlockInfo last = blocks[blocks.length - 1]; final long lastPos = last.isComplete()? fileSizeExcludeBlocksUnderConstruction - last.getNumBytes() : fileSizeExcludeBlocksUnderConstruction; - lastlb = createLocatedBlock(last, lastPos, mode); - isComplete = last.isComplete(); + + locatedBlocks + .lastBlock(createLocatedBlock(locatedBlocks, last, lastPos, mode)) + .lastComplete(last.isComplete()); } else { - lastlb = createLocatedBlock(blocks, - fileSizeExcludeBlocksUnderConstruction, mode); - isComplete = true; + locatedBlocks + .lastBlock(createLocatedBlock(locatedBlocks, blocks, + fileSizeExcludeBlocksUnderConstruction, mode)) + .lastComplete(true); } - LocatedBlocks locations = new LocatedBlocks( - fileSizeExcludeBlocksUnderConstruction, - isFileUnderConstruction, locatedblocks, lastlb, isComplete, feInfo, - ecPolicy); + LocatedBlocks locations = locatedBlocks.build(); // Set caching information for the located blocks. CacheManager cm = namesystem.getCacheManager(); if (cm != null) { @@ -2433,7 +2450,10 @@ public class BlockManager implements BlockStatsMXBean { // To minimize startup time, we discard any second (or later) block reports // that we receive while still in startup phase. - DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID()); + // !#! Register DN with provided storage, not with storage owned by DN + // !#! DN should still have a ref to the DNStorageInfo + DatanodeStorageInfo storageInfo = + providedStorageMap.getStorage(node, storage); if (storageInfo == null) { // We handle this for backwards compatibility. @@ -2465,9 +2485,12 @@ public class BlockManager implements BlockStatsMXBean { nodeID.getDatanodeUuid()); processFirstBlockReport(storageInfo, newReport); } else { - invalidatedBlocks = processReport(storageInfo, newReport, context); + // Block reports for provided storage are not + // maintained by DN heartbeats + if (!StorageType.PROVIDED.equals(storageInfo.getStorageType())) { + invalidatedBlocks = processReport(storageInfo, newReport, context); + } } - storageInfo.receivedBlockReport(); } finally { endTime = Time.monotonicNow(); @@ -2681,7 +2704,7 @@ public class BlockManager implements BlockStatsMXBean { * @param report - the initial block report, to be processed * @throws IOException */ - private void processFirstBlockReport( + void processFirstBlockReport( final DatanodeStorageInfo storageInfo, final BlockListAsLongs report) throws IOException { if (report == null) return; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b339163/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java new file mode 100644 index 0000000..d8bed16 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.io.IOException; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap.ProvidedBlockList; +import org.apache.hadoop.hdfs.util.RwLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used to load provided blocks in the {@link BlockManager}. + */ +public abstract class BlockProvider implements Iterable<Block> { + + private static final Logger LOG = + LoggerFactory.getLogger(ProvidedStorageMap.class); + + private RwLock lock; + private BlockManager bm; + private DatanodeStorageInfo storage; + private boolean hasDNs = false; + + /** + * @param lock the namesystem lock + * @param bm block manager + * @param storage storage for provided blocks + */ + void init(RwLock lock, BlockManager bm, DatanodeStorageInfo storage) { + this.bm = bm; + this.lock = lock; + this.storage = storage; + } + + /** + * start the processing of block report for provided blocks. + * @throws IOException + */ + void start() throws IOException { + assert lock.hasWriteLock() : "Not holding write lock"; + if (hasDNs) { + return; + } + LOG.info("Calling process first blk report from storage: " + storage); + // first pass; periodic refresh should call bm.processReport + bm.processFirstBlockReport(storage, new ProvidedBlockList(iterator())); + hasDNs = true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b339163/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java index c8923da..6ea5198 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java @@ -82,6 +82,12 @@ public class BlockStoragePolicySuite { HdfsConstants.COLD_STORAGE_POLICY_NAME, new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY, StorageType.EMPTY_ARRAY); + final byte providedId = HdfsConstants.PROVIDED_STORAGE_POLICY_ID; + policies[providedId] = new BlockStoragePolicy(providedId, + HdfsConstants.PROVIDED_STORAGE_POLICY_NAME, + new StorageType[]{StorageType.PROVIDED, StorageType.DISK}, + new StorageType[]{StorageType.PROVIDED, StorageType.DISK}, + new StorageType[]{StorageType.PROVIDED, StorageType.DISK}); return new BlockStoragePolicySuite(hotId, policies); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b339163/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index d35894c..28a3d1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -151,7 +151,7 @@ public class DatanodeDescriptor extends DatanodeInfo { private final LeavingServiceStatus leavingServiceStatus = new LeavingServiceStatus(); - private final Map<String, DatanodeStorageInfo> storageMap = + protected final Map<String, DatanodeStorageInfo> storageMap = new HashMap<>(); /** @@ -322,6 +322,12 @@ public class DatanodeDescriptor extends DatanodeInfo { boolean hasStaleStorages() { synchronized (storageMap) { for (DatanodeStorageInfo storage : storageMap.values()) { + if (StorageType.PROVIDED.equals(storage.getStorageType())) { + // to verify provided storage participated in this hb, requires + // check to pass DNDesc. + // e.g., storageInfo.verifyBlockReportId(this, curBlockReportId) + continue; + } if (storage.areBlockContentsStale()) { return true; } @@ -443,17 +449,22 @@ public class DatanodeDescriptor extends DatanodeInfo { this.volumeFailures = volFailures; this.volumeFailureSummary = volumeFailureSummary; for (StorageReport report : reports) { + totalCapacity += report.getCapacity(); + totalRemaining += report.getRemaining(); + totalBlockPoolUsed += report.getBlockPoolUsed(); + totalDfsUsed += report.getDfsUsed(); + totalNonDfsUsed += report.getNonDfsUsed(); + + if (StorageType.PROVIDED.equals( + report.getStorage().getStorageType())) { + continue; + } DatanodeStorageInfo storage = updateStorage(report.getStorage()); if (checkFailedStorages) { failedStorageInfos.remove(storage); } storage.receivedHeartbeat(report); - totalCapacity += report.getCapacity(); - totalRemaining += report.getRemaining(); - totalBlockPoolUsed += report.getBlockPoolUsed(); - totalDfsUsed += report.getDfsUsed(); - totalNonDfsUsed += report.getNonDfsUsed(); } // Update total metrics for the node. @@ -474,6 +485,17 @@ public class DatanodeDescriptor extends DatanodeInfo { } } + void injectStorage(DatanodeStorageInfo s) { + synchronized (storageMap) { + DatanodeStorageInfo storage = storageMap.get(s.getStorageID()); + if (null == storage) { + storageMap.put(s.getStorageID(), s); + } else { + assert storage == s : "found " + storage + " expected " + s; + } + } + } + /** * Remove stale storages from storageMap. We must not remove any storages * as long as they have associated block replicas. http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b339163/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index c75bcea..a7e31a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -532,6 +532,8 @@ public class DatanodeManager { } else { networktopology.sortByDistance(client, lb.getLocations(), activeLen); } + //move PROVIDED storage to the end to prefer local replicas. + lb.moveProvidedToEnd(activeLen); // must update cache since we modified locations array lb.updateCachedStorageInfo(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b339163/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index b1ccea2..76bf915 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -172,6 +172,10 @@ public class DatanodeStorageInfo { this.state = state; } + void setHeartbeatedSinceFailover(boolean value) { + heartbeatedSinceFailover = value; + } + boolean areBlocksOnFailedStorage() { return getState() == State.FAILED && !blocks.isEmpty(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b339163/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LocatedBlockBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LocatedBlockBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LocatedBlockBuilder.java new file mode 100644 index 0000000..0056887 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LocatedBlockBuilder.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + [email protected] [email protected] +class LocatedBlockBuilder { + + protected long flen; + protected List<LocatedBlock> blocks = Collections.<LocatedBlock>emptyList(); + protected boolean isUC; + protected LocatedBlock last; + protected boolean lastComplete; + protected FileEncryptionInfo feInfo; + private final int maxBlocks; + protected ErasureCodingPolicy ecPolicy; + + LocatedBlockBuilder(int maxBlocks) { + this.maxBlocks = maxBlocks; + } + + boolean isBlockMax() { + return blocks.size() >= maxBlocks; + } + + LocatedBlockBuilder fileLength(long fileLength) { + flen = fileLength; + return this; + } + + LocatedBlockBuilder addBlock(LocatedBlock block) { + if (blocks.isEmpty()) { + blocks = new ArrayList<>(); + } + blocks.add(block); + return this; + } + + // return new block so tokens can be set + LocatedBlock newLocatedBlock(ExtendedBlock eb, + DatanodeStorageInfo[] storage, + long pos, boolean isCorrupt) { + LocatedBlock blk = + BlockManager.newLocatedBlock(eb, storage, pos, isCorrupt); + return blk; + } + + LocatedBlockBuilder lastUC(boolean underConstruction) { + isUC = underConstruction; + return this; + } + + LocatedBlockBuilder lastBlock(LocatedBlock block) { + last = block; + return this; + } + + LocatedBlockBuilder lastComplete(boolean complete) { + lastComplete = complete; + return this; + } + + LocatedBlockBuilder encryption(FileEncryptionInfo fileEncryptionInfo) { + feInfo = fileEncryptionInfo; + return this; + } + + LocatedBlockBuilder erasureCoding(ErasureCodingPolicy codingPolicy) { + ecPolicy = codingPolicy; + return this; + } + + LocatedBlocks build(DatanodeDescriptor client) { + return build(); + } + + LocatedBlocks build() { + return new LocatedBlocks(flen, isUC, blocks, last, + lastComplete, feInfo, ecPolicy); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b339163/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java new file mode 100644 index 0000000..d222344 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java @@ -0,0 +1,427 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; +import org.apache.hadoop.hdfs.util.RwLock; +import org.apache.hadoop.util.ReflectionUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.ByteString; + +/** + * This class allows us to manage and multiplex between storages local to + * datanodes, and provided storage. + */ +public class ProvidedStorageMap { + + private static final Logger LOG = + LoggerFactory.getLogger(ProvidedStorageMap.class); + + // limit to a single provider for now + private final BlockProvider blockProvider; + private final String storageId; + private final ProvidedDescriptor providedDescriptor; + private final DatanodeStorageInfo providedStorageInfo; + private boolean providedEnabled; + + ProvidedStorageMap(RwLock lock, BlockManager bm, Configuration conf) + throws IOException { + + storageId = conf.get(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID, + DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT); + + providedEnabled = conf.getBoolean( + DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, + DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT); + + if (!providedEnabled) { + // disable mapping + blockProvider = null; + providedDescriptor = null; + providedStorageInfo = null; + return; + } + + DatanodeStorage ds = new DatanodeStorage( + storageId, State.NORMAL, StorageType.PROVIDED); + providedDescriptor = new ProvidedDescriptor(); + providedStorageInfo = providedDescriptor.createProvidedStorage(ds); + + // load block reader into storage + Class<? extends BlockProvider> fmt = conf.getClass( + DFSConfigKeys.DFS_NAMENODE_BLOCK_PROVIDER_CLASS, + BlockFormatProvider.class, BlockProvider.class); + + blockProvider = ReflectionUtils.newInstance(fmt, conf); + blockProvider.init(lock, bm, providedStorageInfo); + LOG.info("Loaded block provider class: " + + blockProvider.getClass() + " storage: " + providedStorageInfo); + } + + /** + * @param dn datanode descriptor + * @param s data node storage + * @return the {@link DatanodeStorageInfo} for the specified datanode. + * If {@code s} corresponds to a provided storage, the storage info + * representing provided storage is returned. + * @throws IOException + */ + DatanodeStorageInfo getStorage(DatanodeDescriptor dn, DatanodeStorage s) + throws IOException { + if (providedEnabled && storageId.equals(s.getStorageID())) { + if (StorageType.PROVIDED.equals(s.getStorageType())) { + // poll service, initiate + blockProvider.start(); + dn.injectStorage(providedStorageInfo); + return providedDescriptor.getProvidedStorage(dn, s); + } + LOG.warn("Reserved storage {} reported as non-provided from {}", s, dn); + } + return dn.getStorageInfo(s.getStorageID()); + } + + public LocatedBlockBuilder newLocatedBlocks(int maxValue) { + if (!providedEnabled) { + return new LocatedBlockBuilder(maxValue); + } + return new ProvidedBlocksBuilder(maxValue); + } + + /** + * Builder used for creating {@link LocatedBlocks} when a block is provided. + */ + class ProvidedBlocksBuilder extends LocatedBlockBuilder { + + private ShadowDatanodeInfoWithStorage pending; + + ProvidedBlocksBuilder(int maxBlocks) { + super(maxBlocks); + pending = new ShadowDatanodeInfoWithStorage( + providedDescriptor, storageId); + } + + @Override + LocatedBlock newLocatedBlock(ExtendedBlock eb, + DatanodeStorageInfo[] storages, long pos, boolean isCorrupt) { + + DatanodeInfoWithStorage[] locs = + new DatanodeInfoWithStorage[storages.length]; + String[] sids = new String[storages.length]; + StorageType[] types = new StorageType[storages.length]; + for (int i = 0; i < storages.length; ++i) { + sids[i] = storages[i].getStorageID(); + types[i] = storages[i].getStorageType(); + if (StorageType.PROVIDED.equals(storages[i].getStorageType())) { + locs[i] = pending; + } else { + locs[i] = new DatanodeInfoWithStorage( + storages[i].getDatanodeDescriptor(), sids[i], types[i]); + } + } + return new LocatedBlock(eb, locs, sids, types, pos, isCorrupt, null); + } + + @Override + LocatedBlocks build(DatanodeDescriptor client) { + // TODO: to support multiple provided storages, need to pass/maintain map + // set all fields of pending DatanodeInfo + List<String> excludedUUids = new ArrayList<String>(); + for (LocatedBlock b: blocks) { + DatanodeInfo[] infos = b.getLocations(); + StorageType[] types = b.getStorageTypes(); + + for (int i = 0; i < types.length; i++) { + if (!StorageType.PROVIDED.equals(types[i])) { + excludedUUids.add(infos[i].getDatanodeUuid()); + } + } + } + + DatanodeDescriptor dn = providedDescriptor.choose(client, excludedUUids); + if (dn == null) { + dn = providedDescriptor.choose(client); + } + + pending.replaceInternal(dn); + return new LocatedBlocks( + flen, isUC, blocks, last, lastComplete, feInfo, ecPolicy); + } + + @Override + LocatedBlocks build() { + return build(providedDescriptor.chooseRandom()); + } + } + + /** + * An abstract {@link DatanodeInfoWithStorage} to represent provided storage. + */ + static class ShadowDatanodeInfoWithStorage extends DatanodeInfoWithStorage { + private String shadowUuid; + + ShadowDatanodeInfoWithStorage(DatanodeDescriptor d, String storageId) { + super(d, storageId, StorageType.PROVIDED); + } + + @Override + public String getDatanodeUuid() { + return shadowUuid; + } + + public void setDatanodeUuid(String uuid) { + shadowUuid = uuid; + } + + void replaceInternal(DatanodeDescriptor dn) { + updateRegInfo(dn); // overwrite DatanodeID (except UUID) + setDatanodeUuid(dn.getDatanodeUuid()); + setCapacity(dn.getCapacity()); + setDfsUsed(dn.getDfsUsed()); + setRemaining(dn.getRemaining()); + setBlockPoolUsed(dn.getBlockPoolUsed()); + setCacheCapacity(dn.getCacheCapacity()); + setCacheUsed(dn.getCacheUsed()); + setLastUpdate(dn.getLastUpdate()); + setLastUpdateMonotonic(dn.getLastUpdateMonotonic()); + setXceiverCount(dn.getXceiverCount()); + setNetworkLocation(dn.getNetworkLocation()); + adminState = dn.getAdminState(); + setUpgradeDomain(dn.getUpgradeDomain()); + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + } + + /** + * An abstract DatanodeDescriptor to track datanodes with provided storages. + * NOTE: never resolved through registerDatanode, so not in the topology. + */ + static class ProvidedDescriptor extends DatanodeDescriptor { + + private final NavigableMap<String, DatanodeDescriptor> dns = + new ConcurrentSkipListMap<>(); + + ProvidedDescriptor() { + super(new DatanodeID( + null, // String ipAddr, + null, // String hostName, + UUID.randomUUID().toString(), // String datanodeUuid, + 0, // int xferPort, + 0, // int infoPort, + 0, // int infoSecurePort, + 0)); // int ipcPort + } + + DatanodeStorageInfo getProvidedStorage( + DatanodeDescriptor dn, DatanodeStorage s) { + dns.put(dn.getDatanodeUuid(), dn); + // TODO: maintain separate RPC ident per dn + return storageMap.get(s.getStorageID()); + } + + DatanodeStorageInfo createProvidedStorage(DatanodeStorage ds) { + assert null == storageMap.get(ds.getStorageID()); + DatanodeStorageInfo storage = new DatanodeStorageInfo(this, ds); + storage.setHeartbeatedSinceFailover(true); + storageMap.put(storage.getStorageID(), storage); + return storage; + } + + DatanodeDescriptor choose(DatanodeDescriptor client) { + // exact match for now + DatanodeDescriptor dn = dns.get(client.getDatanodeUuid()); + if (null == dn) { + dn = chooseRandom(); + } + return dn; + } + + DatanodeDescriptor choose(DatanodeDescriptor client, + List<String> excludedUUids) { + // exact match for now + DatanodeDescriptor dn = dns.get(client.getDatanodeUuid()); + + if (null == dn || excludedUUids.contains(client.getDatanodeUuid())) { + dn = null; + Set<String> exploredUUids = new HashSet<String>(); + + while(exploredUUids.size() < dns.size()) { + Map.Entry<String, DatanodeDescriptor> d = + dns.ceilingEntry(UUID.randomUUID().toString()); + if (null == d) { + d = dns.firstEntry(); + } + String uuid = d.getValue().getDatanodeUuid(); + //this node has already been explored, and was not selected earlier + if (exploredUUids.contains(uuid)) { + continue; + } + exploredUUids.add(uuid); + //this node has been excluded + if (excludedUUids.contains(uuid)) { + continue; + } + return dns.get(uuid); + } + } + + return dn; + } + + DatanodeDescriptor chooseRandom(DatanodeStorageInfo[] excludedStorages) { + // TODO: Currently this is not uniformly random; + // skewed toward sparse sections of the ids + Set<DatanodeDescriptor> excludedNodes = + new HashSet<DatanodeDescriptor>(); + if (excludedStorages != null) { + for (int i= 0; i < excludedStorages.length; i++) { + LOG.info("Excluded: " + excludedStorages[i].getDatanodeDescriptor()); + excludedNodes.add(excludedStorages[i].getDatanodeDescriptor()); + } + } + Set<DatanodeDescriptor> exploredNodes = new HashSet<DatanodeDescriptor>(); + + while(exploredNodes.size() < dns.size()) { + Map.Entry<String, DatanodeDescriptor> d = + dns.ceilingEntry(UUID.randomUUID().toString()); + if (null == d) { + d = dns.firstEntry(); + } + DatanodeDescriptor node = d.getValue(); + //this node has already been explored, and was not selected earlier + if (exploredNodes.contains(node)) { + continue; + } + exploredNodes.add(node); + //this node has been excluded + if (excludedNodes.contains(node)) { + continue; + } + return node; + } + return null; + } + + DatanodeDescriptor chooseRandom() { + return chooseRandom(null); + } + + @Override + void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) { + // pick a random datanode, delegate to it + DatanodeDescriptor node = chooseRandom(targets); + if (node != null) { + node.addBlockToBeReplicated(block, targets); + } else { + LOG.error("Cannot find a source node to replicate block: " + + block + " from"); + } + } + + @Override + public boolean equals(Object obj) { + return (this == obj) || super.equals(obj); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + } + + /** + * Used to emulate block reports for provided blocks. + */ + static class ProvidedBlockList extends BlockListAsLongs { + + private final Iterator<Block> inner; + + ProvidedBlockList(Iterator<Block> inner) { + this.inner = inner; + } + + @Override + public Iterator<BlockReportReplica> iterator() { + return new Iterator<BlockReportReplica>() { + @Override + public BlockReportReplica next() { + return new BlockReportReplica(inner.next()); + } + @Override + public boolean hasNext() { + return inner.hasNext(); + } + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public int getNumberOfBlocks() { + // VERIFY: only printed for debugging + return -1; + } + + @Override + public ByteString getBlocksBuffer() { + throw new UnsupportedOperationException(); + } + + @Override + public long[] getBlockListAsLongs() { + // should only be used for backwards compat, DN.ver > NN.ver + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b339163/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 0b97667..aa669ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4577,14 +4577,30 @@ </property> <property> + <name>dfs.namenode.provided.enabled</name> + <value>false</value> + <description> + Enables the Namenode to handle provided storages. + </description> + </property> + + <property> + <name>dfs.namenode.block.provider.class</name> + <value>org.apache.hadoop.hdfs.server.blockmanagement.BlockFormatProvider</value> + <description> + The class that is used to load provided blocks in the Namenode. + </description> + </property> + + <property> <name>dfs.provider.class</name> <value>org.apache.hadoop.hdfs.server.common.TextFileRegionProvider</value> <description> - The class that is used to load information about blocks stored in - provided storages. - org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TextFileRegionProvider - is used as the default, which expects the blocks to be specified - using a delimited text file. + The class that is used to load information about blocks stored in + provided storages. + org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TextFileRegionProvider + is used as the default, which expects the blocks to be specified + using a delimited text file. </description> </property> @@ -4592,7 +4608,7 @@ <name>dfs.provided.df.class</name> <value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.DefaultProvidedVolumeDF</value> <description> - The class that is used to measure usage statistics of provided stores. + The class that is used to measure usage statistics of provided stores. </description> </property> @@ -4600,7 +4616,7 @@ <name>dfs.provided.storage.id</name> <value>DS-PROVIDED</value> <description> - The storage ID used for provided stores. + The storage ID used for provided stores. </description> </property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b339163/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java index 3a8fb59..12045a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java @@ -84,6 +84,7 @@ public class TestBlockStoragePolicy { static final byte ONESSD = HdfsConstants.ONESSD_STORAGE_POLICY_ID; static final byte ALLSSD = HdfsConstants.ALLSSD_STORAGE_POLICY_ID; static final byte LAZY_PERSIST = HdfsConstants.MEMORY_STORAGE_POLICY_ID; + static final byte PROVIDED = HdfsConstants.PROVIDED_STORAGE_POLICY_ID; @Test (timeout=300000) public void testConfigKeyEnabled() throws IOException { @@ -143,6 +144,9 @@ public class TestBlockStoragePolicy { expectedPolicyStrings.put(ALLSSD, "BlockStoragePolicy{ALL_SSD:" + ALLSSD + ", storageTypes=[SSD], creationFallbacks=[DISK], " + "replicationFallbacks=[DISK]}"); + expectedPolicyStrings.put(PROVIDED, "BlockStoragePolicy{PROVIDED:" + PROVIDED + + ", storageTypes=[PROVIDED, DISK], creationFallbacks=[PROVIDED, DISK], " + + "replicationFallbacks=[PROVIDED, DISK]}"); for(byte i = 1; i < 16; i++) { final BlockStoragePolicy policy = POLICY_SUITE.getPolicy(i); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b339163/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index 286f4a4..81405eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -300,7 +300,7 @@ public class TestDatanodeManager { */ @Test public void testSortLocatedBlocks() throws IOException, URISyntaxException { - HelperFunction(null); + HelperFunction(null, 0); } /** @@ -312,7 +312,7 @@ public class TestDatanodeManager { */ @Test public void testgoodScript() throws IOException, URISyntaxException { - HelperFunction("/" + Shell.appendScriptExtension("topology-script")); + HelperFunction("/" + Shell.appendScriptExtension("topology-script"), 0); } @@ -325,7 +325,21 @@ public class TestDatanodeManager { */ @Test public void testBadScript() throws IOException, URISyntaxException { - HelperFunction("/"+ Shell.appendScriptExtension("topology-broken-script")); + HelperFunction("/"+ Shell.appendScriptExtension("topology-broken-script"), 0); + } + + /** + * Test with different sorting functions but include datanodes + * with provided storage + * @throws IOException + * @throws URISyntaxException + */ + @Test + public void testWithProvidedTypes() throws IOException, URISyntaxException { + HelperFunction(null, 1); + HelperFunction(null, 3); + HelperFunction("/" + Shell.appendScriptExtension("topology-script"), 1); + HelperFunction("/" + Shell.appendScriptExtension("topology-script"), 2); } /** @@ -333,11 +347,12 @@ public class TestDatanodeManager { * we invoke this function with and without topology scripts * * @param scriptFileName - Script Name or null + * @param providedStorages - number of provided storages to add * * @throws URISyntaxException * @throws IOException */ - public void HelperFunction(String scriptFileName) + public void HelperFunction(String scriptFileName, int providedStorages) throws URISyntaxException, IOException { // create the DatanodeManager which will be tested Configuration conf = new Configuration(); @@ -352,17 +367,25 @@ public class TestDatanodeManager { } DatanodeManager dm = mockDatanodeManager(fsn, conf); + int totalDNs = 5 + providedStorages; + // register 5 datanodes, each with different storage ID and type - DatanodeInfo[] locs = new DatanodeInfo[5]; - String[] storageIDs = new String[5]; - StorageType[] storageTypes = new StorageType[]{ - StorageType.ARCHIVE, - StorageType.DEFAULT, - StorageType.DISK, - StorageType.RAM_DISK, - StorageType.SSD - }; - for (int i = 0; i < 5; i++) { + DatanodeInfo[] locs = new DatanodeInfo[totalDNs]; + String[] storageIDs = new String[totalDNs]; + List<StorageType> storageTypesList = new ArrayList<>( + Arrays.asList(StorageType.ARCHIVE, + StorageType.DEFAULT, + StorageType.DISK, + StorageType.RAM_DISK, + StorageType.SSD)); + + for (int i = 0; i < providedStorages; i++) { + storageTypesList.add(StorageType.PROVIDED); + } + + StorageType[] storageTypes= storageTypesList.toArray(new StorageType[0]); + + for (int i = 0; i < totalDNs; i++) { // register new datanode String uuid = "UUID-" + i; String ip = "IP-" + i; @@ -398,9 +421,9 @@ public class TestDatanodeManager { DatanodeInfo[] sortedLocs = block.getLocations(); storageIDs = block.getStorageIDs(); storageTypes = block.getStorageTypes(); - assertThat(sortedLocs.length, is(5)); - assertThat(storageIDs.length, is(5)); - assertThat(storageTypes.length, is(5)); + assertThat(sortedLocs.length, is(totalDNs)); + assertThat(storageIDs.length, is(totalDNs)); + assertThat(storageTypes.length, is(totalDNs)); for (int i = 0; i < sortedLocs.length; i++) { assertThat(((DatanodeInfoWithStorage) sortedLocs[i]).getStorageID(), is(storageIDs[i])); @@ -414,6 +437,14 @@ public class TestDatanodeManager { is(DatanodeInfo.AdminStates.DECOMMISSIONED)); assertThat(sortedLocs[sortedLocs.length - 2].getAdminState(), is(DatanodeInfo.AdminStates.DECOMMISSIONED)); + // check that the StorageType of datanoodes immediately + // preceding the decommissioned datanodes is PROVIDED + for (int i = 0; i < providedStorages; i++) { + assertThat( + ((DatanodeInfoWithStorage) + sortedLocs[sortedLocs.length - 3 - i]).getStorageType(), + is(StorageType.PROVIDED)); + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b339163/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java new file mode 100644 index 0000000..3b75806 --- /dev/null +++ b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.Random; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockFormatProvider; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockProvider; +import org.apache.hadoop.hdfs.server.common.BlockFormat; +import org.apache.hadoop.hdfs.server.common.FileRegionProvider; +import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat; +import org.apache.hadoop.hdfs.server.common.TextFileRegionProvider; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.*; + +public class TestNameNodeProvidedImplementation { + + @Rule public TestName name = new TestName(); + public static final Logger LOG = + LoggerFactory.getLogger(TestNameNodeProvidedImplementation.class); + + final Random r = new Random(); + final File fBASE = new File(MiniDFSCluster.getBaseDirectory()); + final Path BASE = new Path(fBASE.toURI().toString()); + final Path NAMEPATH = new Path(BASE, "providedDir");; + final Path NNDIRPATH = new Path(BASE, "nnDir"); + final Path BLOCKFILE = new Path(NNDIRPATH, "blocks.csv"); + final String SINGLEUSER = "usr1"; + final String SINGLEGROUP = "grp1"; + + Configuration conf; + MiniDFSCluster cluster; + + @Before + public void setSeed() throws Exception { + if (fBASE.exists() && !FileUtil.fullyDelete(fBASE)) { + throw new IOException("Could not fully delete " + fBASE); + } + long seed = r.nextLong(); + r.setSeed(seed); + System.out.println(name.getMethodName() + " seed: " + seed); + conf = new HdfsConfiguration(); + conf.set(SingleUGIResolver.USER, SINGLEUSER); + conf.set(SingleUGIResolver.GROUP, SINGLEGROUP); + + conf.set(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID, + DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, true); + + conf.setClass(DFSConfigKeys.DFS_NAMENODE_BLOCK_PROVIDER_CLASS, + BlockFormatProvider.class, BlockProvider.class); + conf.setClass(DFSConfigKeys.DFS_PROVIDER_CLASS, + TextFileRegionProvider.class, FileRegionProvider.class); + conf.setClass(DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS, + TextFileRegionFormat.class, BlockFormat.class); + + conf.set(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_WRITE_PATH, + BLOCKFILE.toString()); + conf.set(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_READ_PATH, + BLOCKFILE.toString()); + conf.set(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER, ","); + + File imageDir = new File(NAMEPATH.toUri()); + if (!imageDir.exists()) { + LOG.info("Creating directory: " + imageDir); + imageDir.mkdirs(); + } + + File nnDir = new File(NNDIRPATH.toUri()); + if (!nnDir.exists()) { + nnDir.mkdirs(); + } + + // create 10 random files under BASE + for (int i=0; i < 10; i++) { + File newFile = new File(new Path(NAMEPATH, "file" + i).toUri()); + if(!newFile.exists()) { + try { + LOG.info("Creating " + newFile.toString()); + newFile.createNewFile(); + Writer writer = new OutputStreamWriter( + new FileOutputStream(newFile.getAbsolutePath()), "utf-8"); + for(int j=0; j < 10*i; j++) { + writer.write("0"); + } + writer.flush(); + writer.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + @After + public void shutdown() throws Exception { + try { + if (cluster != null) { + cluster.shutdown(true, true); + } + } finally { + cluster = null; + } + } + + void createImage(TreeWalk t, Path out, + Class<? extends BlockResolver> blockIdsClass) throws Exception { + ImageWriter.Options opts = ImageWriter.defaults(); + opts.setConf(conf); + opts.output(out.toString()) + .blocks(TextFileRegionFormat.class) + .blockIds(blockIdsClass); + try (ImageWriter w = new ImageWriter(opts)) { + for (TreePath e : t) { + w.accept(e); + } + } + } + + void startCluster(Path nspath, int numDatanodes, + StorageType[] storageTypes, + StorageType[][] storageTypesPerDatanode) + throws IOException { + conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString()); + + if (storageTypesPerDatanode != null) { + cluster = new MiniDFSCluster.Builder(conf) + .format(false) + .manageNameDfsDirs(false) + .numDataNodes(numDatanodes) + .storageTypes(storageTypesPerDatanode) + .build(); + } else if (storageTypes != null) { + cluster = new MiniDFSCluster.Builder(conf) + .format(false) + .manageNameDfsDirs(false) + .numDataNodes(numDatanodes) + .storagesPerDatanode(storageTypes.length) + .storageTypes(storageTypes) + .build(); + } else { + cluster = new MiniDFSCluster.Builder(conf) + .format(false) + .manageNameDfsDirs(false) + .numDataNodes(numDatanodes) + .build(); + } + cluster.waitActive(); + } + + @Test(timeout = 20000) + public void testLoadImage() throws Exception { + final long seed = r.nextLong(); + LOG.info("NAMEPATH: " + NAMEPATH); + createImage(new RandomTreeWalk(seed), NNDIRPATH, FixedBlockResolver.class); + startCluster(NNDIRPATH, 0, new StorageType[] {StorageType.PROVIDED}, null); + + FileSystem fs = cluster.getFileSystem(); + for (TreePath e : new RandomTreeWalk(seed)) { + FileStatus rs = e.getFileStatus(); + Path hp = new Path(rs.getPath().toUri().getPath()); + assertTrue(fs.exists(hp)); + FileStatus hs = fs.getFileStatus(hp); + assertEquals(rs.getPath().toUri().getPath(), + hs.getPath().toUri().getPath()); + assertEquals(rs.getPermission(), hs.getPermission()); + assertEquals(rs.getLen(), hs.getLen()); + assertEquals(SINGLEUSER, hs.getOwner()); + assertEquals(SINGLEGROUP, hs.getGroup()); + assertEquals(rs.getAccessTime(), hs.getAccessTime()); + assertEquals(rs.getModificationTime(), hs.getModificationTime()); + } + } + + @Test(timeout=20000) + public void testBlockLoad() throws Exception { + conf.setClass(ImageWriter.Options.UGI_CLASS, + SingleUGIResolver.class, UGIResolver.class); + createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, + FixedBlockResolver.class); + startCluster(NNDIRPATH, 1, new StorageType[] {StorageType.PROVIDED}, null); + } + + @Test(timeout=500000) + public void testDefaultReplication() throws Exception { + int targetReplication = 2; + conf.setInt(FixedBlockMultiReplicaResolver.REPLICATION, targetReplication); + createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, + FixedBlockMultiReplicaResolver.class); + // make the last Datanode with only DISK + startCluster(NNDIRPATH, 3, null, + new StorageType[][] { + {StorageType.PROVIDED}, + {StorageType.PROVIDED}, + {StorageType.DISK}} + ); + // wait for the replication to finish + Thread.sleep(50000); + + FileSystem fs = cluster.getFileSystem(); + int count = 0; + for (TreePath e : new FSTreeWalk(NAMEPATH, conf)) { + FileStatus rs = e.getFileStatus(); + Path hp = removePrefix(NAMEPATH, rs.getPath()); + LOG.info("hp " + hp.toUri().getPath()); + //skip HDFS specific files, which may have been created later on. + if (hp.toString().contains("in_use.lock") + || hp.toString().contains("current")) { + continue; + } + e.accept(count++); + assertTrue(fs.exists(hp)); + FileStatus hs = fs.getFileStatus(hp); + + if (rs.isFile()) { + BlockLocation[] bl = fs.getFileBlockLocations( + hs.getPath(), 0, hs.getLen()); + int i = 0; + for(; i < bl.length; i++) { + int currentRep = bl[i].getHosts().length; + assertEquals(targetReplication , currentRep); + } + } + } + } + + + static Path removePrefix(Path base, Path walk) { + Path wpath = new Path(walk.toUri().getPath()); + Path bpath = new Path(base.toUri().getPath()); + Path ret = new Path("/"); + while (!(bpath.equals(wpath) || "".equals(wpath.getName()))) { + ret = "".equals(ret.getName()) + ? new Path("/", wpath.getName()) + : new Path(new Path("/", wpath.getName()), + new Path(ret.toString().substring(1))); + wpath = wpath.getParent(); + } + if (!bpath.equals(wpath)) { + throw new IllegalArgumentException(base + " not a prefix of " + walk); + } + return ret; + } + + @Test(timeout=30000) + public void testBlockRead() throws Exception { + conf.setClass(ImageWriter.Options.UGI_CLASS, + FsUGIResolver.class, UGIResolver.class); + createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, + FixedBlockResolver.class); + startCluster(NNDIRPATH, 3, new StorageType[] {StorageType.PROVIDED}, null); + FileSystem fs = cluster.getFileSystem(); + Thread.sleep(2000); + int count = 0; + // read NN metadata, verify contents match + for (TreePath e : new FSTreeWalk(NAMEPATH, conf)) { + FileStatus rs = e.getFileStatus(); + Path hp = removePrefix(NAMEPATH, rs.getPath()); + LOG.info("hp " + hp.toUri().getPath()); + //skip HDFS specific files, which may have been created later on. + if(hp.toString().contains("in_use.lock") + || hp.toString().contains("current")) { + continue; + } + e.accept(count++); + assertTrue(fs.exists(hp)); + FileStatus hs = fs.getFileStatus(hp); + assertEquals(hp.toUri().getPath(), hs.getPath().toUri().getPath()); + assertEquals(rs.getPermission(), hs.getPermission()); + assertEquals(rs.getOwner(), hs.getOwner()); + assertEquals(rs.getGroup(), hs.getGroup()); + + if (rs.isFile()) { + assertEquals(rs.getLen(), hs.getLen()); + try (ReadableByteChannel i = Channels.newChannel( + new FileInputStream(new File(rs.getPath().toUri())))) { + try (ReadableByteChannel j = Channels.newChannel( + fs.open(hs.getPath()))) { + ByteBuffer ib = ByteBuffer.allocate(4096); + ByteBuffer jb = ByteBuffer.allocate(4096); + while (true) { + int il = i.read(ib); + int jl = j.read(jb); + if (il < 0 || jl < 0) { + assertEquals(il, jl); + break; + } + ib.flip(); + jb.flip(); + int cmp = Math.min(ib.remaining(), jb.remaining()); + for (int k = 0; k < cmp; ++k) { + assertEquals(ib.get(), jb.get()); + } + ib.compact(); + jb.compact(); + } + + } + } + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
