http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java index 443576d..fa816e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java @@ -78,6 +78,17 @@ public final class HdfsConstants { public static final String CLIENT_NAMENODE_PROTOCOL_NAME = "org.apache.hadoop.hdfs.protocol.ClientProtocol"; + /* + * These values correspond to the values used by the system default erasure + * coding schema. + * TODO: to be removed once all places use schema. + */ + + public static final byte NUM_DATA_BLOCKS = 6; + public static final byte NUM_PARITY_BLOCKS = 3; + // The chunk size for striped block which is used by erasure coding + public static final int BLOCK_STRIPED_CELL_SIZE = 64 * 1024; + // SafeMode actions public enum SafeModeAction { SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java index 34f429a..8c902b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.io.erasurecode.ECSchema; /** Interface that represents the over the wire information for a file. */ @@ -48,6 +49,9 @@ public class HdfsFileStatus { private final FileEncryptionInfo feInfo; + private final ECSchema ecSchema; + private final int stripeCellSize; + // Used by dir, not including dot and dotdot. Always zero for a regular file. private final int childrenNum; private final byte storagePolicy; @@ -73,7 +77,7 @@ public class HdfsFileStatus { long blocksize, long modification_time, long access_time, FsPermission permission, String owner, String group, byte[] symlink, byte[] path, long fileId, int childrenNum, FileEncryptionInfo feInfo, - byte storagePolicy) { + byte storagePolicy, ECSchema ecSchema, int stripeCellSize) { this.length = length; this.isdir = isdir; this.block_replication = (short)block_replication; @@ -93,6 +97,8 @@ public class HdfsFileStatus { this.childrenNum = childrenNum; this.feInfo = feInfo; this.storagePolicy = storagePolicy; + this.ecSchema = ecSchema; + this.stripeCellSize = stripeCellSize; } /** @@ -250,6 +256,14 @@ public class HdfsFileStatus { return feInfo; } + public ECSchema getECSchema() { + return ecSchema; + } + + public int getStripeCellSize() { + return stripeCellSize; + } + public final int getChildrenNum() { return childrenNum; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java index cc13f10..a9596bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java @@ -49,14 +49,14 @@ public class LocatedBlock { // else false. If block has few corrupt replicas, they are filtered and // their locations are not part of this object private boolean corrupt; - private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>(); + private Token<BlockTokenIdentifier> blockToken = new Token<>(); /** * List of cached datanode locations */ private DatanodeInfo[] cachedLocs; // Used when there are no locations - private static final DatanodeInfoWithStorage[] EMPTY_LOCS = + static final DatanodeInfoWithStorage[] EMPTY_LOCS = new DatanodeInfoWithStorage[0]; public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) { @@ -203,4 +203,8 @@ public class LocatedBlock { + "; locs=" + Arrays.asList(locs) + "}"; } + + public boolean isStriped() { + return false; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java index e4896977..735e7b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java @@ -24,6 +24,7 @@ import java.util.Comparator; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.io.erasurecode.ECSchema; /** * Collection of blocks with their locations and the file length. @@ -37,6 +38,8 @@ public class LocatedBlocks { private final LocatedBlock lastLocatedBlock; private final boolean isLastBlockComplete; private final FileEncryptionInfo fileEncryptionInfo; + private final ECSchema ecSchema; + private final int stripeCellSize; public LocatedBlocks() { fileLength = 0; @@ -45,17 +48,22 @@ public class LocatedBlocks { lastLocatedBlock = null; isLastBlockComplete = false; fileEncryptionInfo = null; + ecSchema = null; + stripeCellSize = 0; } public LocatedBlocks(long flength, boolean isUnderConstuction, - List<LocatedBlock> blks, LocatedBlock lastBlock, - boolean isLastBlockCompleted, FileEncryptionInfo feInfo) { + List<LocatedBlock> blks, LocatedBlock lastBlock, + boolean isLastBlockCompleted, FileEncryptionInfo feInfo, + ECSchema ecSchema, int stripeCellSize) { fileLength = flength; blocks = blks; underConstruction = isUnderConstuction; this.lastLocatedBlock = lastBlock; this.isLastBlockComplete = isLastBlockCompleted; this.fileEncryptionInfo = feInfo; + this.ecSchema = ecSchema; + this.stripeCellSize = stripeCellSize; } /** @@ -112,6 +120,20 @@ public class LocatedBlocks { } /** + * @return The ECSchema for ErasureCoded file, null otherwise. + */ + public ECSchema getECSchema() { + return ecSchema; + } + + /** + * @return Stripe Cell size for ErasureCoded file, 0 otherwise. + */ + public int getStripeCellSize() { + return stripeCellSize; + } + + /** * Find block containing specified offset. * * @return block if found, or null otherwise. http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java new file mode 100644 index 0000000..a9a80c2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; + +import java.util.Arrays; + +/** + * {@link LocatedBlock} with striped block support. For a striped block, each + * datanode storage is associated with a block in the block group. We need to + * record the index (in the striped block group) for each of them. + */ [email protected] [email protected] +public class LocatedStripedBlock extends LocatedBlock { + private static final int[] EMPTY_INDICES = {}; + private static final Token<BlockTokenIdentifier> EMPTY_TOKEN = new Token<>(); + + private int[] blockIndices; + private Token<BlockTokenIdentifier>[] blockTokens; + + public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs, + String[] storageIDs, StorageType[] storageTypes, int[] indices, + long startOffset, boolean corrupt, DatanodeInfo[] cachedLocs) { + super(b, locs, storageIDs, storageTypes, startOffset, corrupt, cachedLocs); + + if (indices == null) { + this.blockIndices = EMPTY_INDICES; + } else { + this.blockIndices = new int[indices.length]; + System.arraycopy(indices, 0, blockIndices, 0, indices.length); + } + blockTokens = new Token[blockIndices.length]; + for (int i = 0; i < blockIndices.length; i++) { + blockTokens[i] = EMPTY_TOKEN; + } + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{" + getBlock() + + "; getBlockSize()=" + getBlockSize() + + "; corrupt=" + isCorrupt() + + "; offset=" + getStartOffset() + + "; locs=" + Arrays.asList(getLocations()) + + "; indices=" + Arrays.toString(blockIndices) + + "}"; + } + + public int[] getBlockIndices() { + return this.blockIndices; + } + + @Override + public boolean isStriped() { + return true; + } + + public Token<BlockTokenIdentifier>[] getBlockTokens() { + return blockTokens; + } + + public void setBlockTokens(Token<BlockTokenIdentifier>[] tokens) { + this.blockTokens = tokens; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java index ac19d44..a6c7b10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java @@ -61,7 +61,7 @@ public class SnapshottableDirectoryStatus { int snapshotNumber, int snapshotQuota, byte[] parentFullPath) { this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time, access_time, permission, owner, group, null, localName, inodeId, - childrenNum, null, HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED); + childrenNum, null, HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, null, 0); this.snapshotNumber = snapshotNumber; this.snapshotQuota = snapshotQuota; this.parentFullPath = parentFullPath; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java index 713836c..eeadd73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java @@ -132,7 +132,7 @@ class JsonUtilClient { blockSize, mTime, aTime, permission, owner, group, symlink, DFSUtilClient.string2Bytes(localName), fileId, childrenNum, null, - storagePolicy); + storagePolicy, null, 0); } /** Convert a Json map to an ExtendedBlock object. */ @@ -503,7 +503,7 @@ class JsonUtilClient { (Map<?, ?>) m.get("lastLocatedBlock")); final boolean isLastBlockComplete = (Boolean)m.get("isLastBlockComplete"); return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks, - lastLocatedBlock, isLastBlockComplete, null); + lastLocatedBlock, isLastBlockComplete, null, null, 0); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 7d32568..62db8ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -34,6 +34,7 @@ import "acl.proto"; import "xattr.proto"; import "encryption.proto"; import "inotify.proto"; +import "erasurecoding.proto"; /** * The ClientNamenodeProtocol Service defines the interface between a client @@ -866,8 +867,14 @@ service ClientNamenodeProtocol { returns(ListEncryptionZonesResponseProto); rpc getEZForPath(GetEZForPathRequestProto) returns(GetEZForPathResponseProto); + rpc createErasureCodingZone(CreateErasureCodingZoneRequestProto) + returns(CreateErasureCodingZoneResponseProto); rpc getCurrentEditLogTxid(GetCurrentEditLogTxidRequestProto) returns(GetCurrentEditLogTxidResponseProto); rpc getEditsFromTxid(GetEditsFromTxidRequestProto) returns(GetEditsFromTxidResponseProto); + rpc getECSchemas(GetECSchemasRequestProto) + returns(GetECSchemasResponseProto); + rpc getErasureCodingZone(GetErasureCodingZoneRequestProto) + returns(GetErasureCodingZoneResponseProto); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto new file mode 100644 index 0000000..56bb7a2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.hdfs.protocol.proto"; +option java_outer_classname = "ErasureCodingProtos"; +option java_generate_equals_and_hash = true; +package hadoop.hdfs; + +import "hdfs.proto"; + +/** + * ErasureCodingZone + */ +message ErasureCodingZoneProto { + required string dir = 1; + required ECSchemaProto schema = 2; + required uint32 cellSize = 3; +} + +message CreateErasureCodingZoneRequestProto { + required string src = 1; + optional ECSchemaProto schema = 2; + optional uint32 cellSize = 3; +} + +message CreateErasureCodingZoneResponseProto { +} + +message GetECSchemasRequestProto { // void request +} + +message GetECSchemasResponseProto { + repeated ECSchemaProto schemas = 1; +} + +message GetErasureCodingZoneRequestProto { + required string src = 1; // path to get the zone info +} + +message GetErasureCodingZoneResponseProto { + optional ErasureCodingZoneProto ECZone = 1; +} + +/** + * Block erasure coding recovery info + */ +message BlockECRecoveryInfoProto { + required ExtendedBlockProto block = 1; + required DatanodeInfosProto sourceDnInfos = 2; + required DatanodeInfosProto targetDnInfos = 3; + required StorageUuidsProto targetStorageUuids = 4; + required StorageTypesProto targetStorageTypes = 5; + repeated uint32 liveBlockIndices = 6; + required ECSchemaProto ecSchema = 7; + required uint32 cellSize = 8; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index 86fb462..d2cb665 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -220,6 +220,10 @@ message LocatedBlockProto { repeated bool isCached = 6 [packed=true]; // if a location in locs is cached repeated StorageTypeProto storageTypes = 7; repeated string storageIDs = 8; + + // striped block related fields + repeated uint32 blockIndex = 9; // used for striped block to indicate block index for each storage + repeated hadoop.common.TokenProto blockTokens = 10; // each internal block has a block token } message DataEncryptionKeyProto { @@ -300,6 +304,29 @@ message LocatedBlocksProto { optional LocatedBlockProto lastBlock = 4; required bool isLastBlockComplete = 5; optional FileEncryptionInfoProto fileEncryptionInfo = 6; + + // Optional field for erasure coding + optional ECSchemaProto eCSchema = 7; + optional uint32 stripeCellSize = 8; +} + +/** + * ECSchema options entry + */ +message ECSchemaOptionEntryProto { + required string key = 1; + required string value = 2; +} + +/** + * ECSchema for erasurecoding + */ +message ECSchemaProto { + required string schemaName = 1; + required string codecName = 2; + required uint32 dataUnits = 3; + required uint32 parityUnits = 4; + repeated ECSchemaOptionEntryProto options = 5; } /** @@ -336,7 +363,11 @@ message HdfsFileStatusProto { optional FileEncryptionInfoProto fileEncryptionInfo = 15; optional uint32 storagePolicy = 16 [default = 0]; // block storage policy id -} + + // Optional field for erasure coding + optional ECSchemaProto ecSchema = 17; + optional uint32 stripeCellSize = 18; +} /** * Checksum algorithms/types used in HDFS @@ -498,6 +529,9 @@ message BlockWithLocationsProto { repeated string datanodeUuids = 2; // Datanodes with replicas of the block repeated string storageUuids = 3; // Storages with replicas of the block repeated StorageTypeProto storageTypes = 4; + + optional bytes indices = 5; + optional uint32 dataBlockNum = 6; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt new file mode 100755 index 0000000..45afd2c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -0,0 +1,396 @@ + BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS + + HDFS-7347. Configurable erasure coding policy for individual files and + directories ( Zhe Zhang via vinayakumarb ) + + HDFS-7339. Representing striped block groups in NameNode with hierarchical + naming protocol ( Zhe Zhang ) + + HDFS-7652. Process block reports for erasure coded blocks (Zhe Zhang) + + HDFS-7716. Erasure Coding: extend BlockInfo to handle EC info (Jing Zhao) + + HDFS-7749. Erasure Coding: Add striped block support in INodeFile (Jing Zhao) + + HDFS-7837. Erasure Coding: allocate and persist striped blocks in NameNode + (Jing Zhao via Zhe Zhang) + + HDFS-7872. Erasure Coding: INodeFile.dumpTreeRecursively() supports to print + striped blocks (Takuya Fukudome via jing9) + + HDFS-7853. Erasure coding: extend LocatedBlocks to support reading from + striped files (Jing Zhao) + + HDFS-7826. Erasure Coding: Update INodeFile quota computation for striped + blocks ( Kai Sasaki via jing9 ) + + HDFS-7912. Erasure Coding: track BlockInfo instead of Block in + UnderReplicatedBlocks and PendingReplicationBlocks (Jing Zhao) + + HDFS-7369. Erasure coding: distribute recovery work for striped blocks to + DataNode (Zhe Zhang) + + HDFS-7864. Erasure Coding: Update safemode calculation for striped blocks + (GAO Rui via jing9) + + HDFS-7827. Erasure Coding: support striped blocks in non-protobuf fsimage + ( Hui Zheng via jing9 ) + + HDFS-7616. Add a test for BlockGroup support in FSImage. + (Takuya Fukudome via szetszwo) + + HDFS-7907. Erasure Coding: track invalid, corrupt, and under-recovery striped + blocks in NameNode (Jing Zhao) + + HDFS-8005. Erasure Coding: simplify striped block recovery work computation + and add tests (Jing Zhao) + + HDFS-7617. Add unit tests for editlog transactions for EC + (Hui Zheng via Zhe Zhang) + + HDFS-7839. Erasure coding: implement facilities in NameNode to create and + manage EC zones (Zhe Zhang) + + HDFS-7969. Erasure coding: NameNode support for lease recovery of striped + block groups. (Zhe Zhang) + + HDFS-7782. Erasure coding: pread from files in striped layout. + (Zhe Zhang and Jing Zhao via Zhe Zhang) + + HDFS-8023. Erasure Coding: retrieve eraure coding schema for a file from + NameNode (vinayakumarb) + + HDFS-8074. Define a system-wide default EC schema. (Kai Zheng) + + HDFS-8077. Erasure coding: fix bugs in EC zone and symlinks. + (Jing Zhao and Zhe Zhang via Jing Zhao) + + HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng) + + HDFS-7889. Subclass DFSOutputStream to support writing striping layout files. (Li Bo via Kai Zheng) + + HDFS-8090. Erasure Coding: Add RPC to client-namenode to list all + ECSchemas loaded in Namenode. (vinayakumarb) + + HDFS-8122. Erasure Coding: Support specifying ECSchema during creation of ECZone. + (Vinayakumar B via Zhe Zhang) + + HDFS-8114. Erasure coding: Add auditlog FSNamesystem#createErasureCodingZone if this + operation fails. (Rakesh R via Zhe Zhang) + + HDFS-8123. Erasure Coding: Better to move EC related proto messages to a + separate erasurecoding proto file (Rakesh R via vinayakumarb) + + HDFS-7349. Support DFS command for the EC encoding (vinayakumarb) + + HDFS-8120. Erasure coding: created util class to analyze striped block groups. + (Contributed by Zhe Zhang and Li Bo via Jing Zhao) + + HDFS-7994. Detect if resevered EC Block ID is already used during namenode + startup. (Hui Zheng via szetszwo) + + HDFS-8167. BlockManager.addBlockCollectionWithCheck should check if the block is a striped block. (Hui Zheng via zhz). + + HDFS-8166. DFSStripedOutputStream should not create empty blocks. (Jing Zhao) + + HDFS-7937. Erasure Coding: INodeFile quota computation unit tests. + (Kai Sasaki via Jing Zhao) + + HDFS-8145. Fix the editlog corruption exposed by failed TestAddStripedBlocks. + (Jing Zhao) + + HDFS-8146. Protobuf changes for BlockECRecoveryCommand and its fields for + making it ready for transfer to DN (Uma Maheswara Rao G via vinayakumarb) + + HDFS-8181. createErasureCodingZone sets retryCache state as false always + (Uma Maheswara Rao G via vinayakumarb) + + HDFS-8190. StripedBlockUtil.getInternalBlockLength may have overflow error. + (szetszwo) + + HDFS-8216. TestDFSStripedOutputStream should use BlockReaderTestUtil to + create BlockReader. (szetszwo via Zhe Zhang) + + HDFS-8212. DistributedFileSystem.createErasureCodingZone should pass schema + in FileSystemLinkResolver. (szetszwo via Zhe Zhang) + + HDFS-8024. Erasure Coding: ECworker frame, basics, bootstraping and configuration. + (umamahesh) + + HDFS-8156. Add/implement necessary APIs even we just have the system default + schema. (Kai Zheng via Zhe Zhang) + + HDFS-8136. Client gets and uses EC schema when reads and writes a stripping + file. (Kai Sasaki via Kai Zheng) + + HDFS-8233. Fix DFSStripedOutputStream#getCurrentBlockGroupBytes when the last + stripe is at the block group boundary. (jing9) + + HDFS-8223. Should calculate checksum for parity blocks in DFSStripedOutputStream. + (Yi Liu via jing9) + + HDFS-8228. Erasure Coding: SequentialBlockGroupIdGenerator#nextValue may cause + block id conflicts (Jing Zhao via Zhe Zhang) + + HDFS-8033. Erasure coding: stateful (non-positional) read from files in + striped layout (Zhe Zhang) + + HDFS-8230. Erasure Coding: Ignore DatanodeProtocol#DNA_ERASURE_CODING_RECOVERY + commands from standbynode if any (vinayakumarb) + + HDFS-8189. ClientProtocol#createErasureCodingZone API was wrongly annotated + as Idempotent (vinayakumarb) + + HDFS-8235. Erasure Coding: Create DFSStripedInputStream in DFSClient#open. + (Kai Sasaki via jing9) + + HDFS-8272. Erasure Coding: simplify the retry logic in DFSStripedInputStream + (stateful read). (Jing Zhao via Zhe Zhang) + + HDFS-8282. Erasure coding: move striped reading logic to StripedBlockUtil. + (Zhe Zhang) + + HDFS-8183. Erasure Coding: Improve DFSStripedOutputStream closing of + datastreamer threads. (Rakesh R via Zhe Zhang) + + HDFS-8308. Erasure Coding: NameNode may get blocked in waitForLoadingFSImage() + when loading editlog. (jing9) + + HDFS-7949. WebImageViewer need support file size calculation with striped + blocks. (Rakesh R via Zhe Zhang) + + HDFS-8316. Erasure coding: refactor EC constants to be consistent with HDFS-8249. + (Zhe Zhang via jing9) + + HDFS-8281. Erasure Coding: implement parallel stateful reading for striped layout. + (jing9) + + HDFS-8137. Send the EC schema to DataNode via EC encoding/recovering command(umamahesh) + + HDFS-8242. Erasure Coding: XML based end-to-end test for ECCli commands + (Rakesh R via vinayakumarb) + + HDFS-8324. Add trace info to DFSClient#getErasureCodingZoneInfo(..) (vinayakumarb via + umamahesh) + + HDFS-7672. Handle write failure for stripping blocks and refactor the + existing code in DFSStripedOutputStream and StripedDataStreamer. (szetszwo) + + HDFS-7348. Erasure Coding: DataNode reconstruct striped blocks. + (Yi Liu via Zhe Zhang) + + HADOOP-11921. Enhance tests for erasure coders. (Kai Zheng) + + HDFS-8334. Erasure coding: rename DFSStripedInputStream related test + classes. (Zhe Zhang) + + HDFS-8129. Erasure Coding: Maintain consistent naming for Erasure Coding related classes - EC/ErasureCoding + (umamahesh) + + HDFS-8203. Erasure Coding: Seek and other Ops in DFSStripedInputStream. + (Yi Liu via jing9) + + HDFS-8289. Erasure Coding: add ECSchema to HdfsFileStatus. (Yong Zhang via + jing9) + + HDFS-8355. Erasure Coding: Refactor BlockInfo and BlockInfoUnderConstruction. + (Tsz Wo Nicholas Sze via jing9) + + HDFS-7678. Erasure coding: DFSInputStream with decode functionality (pread). + (Zhe Zhang) + + HDFS-8372. Erasure coding: compute storage type quotas for striped files, + to be consistent with HDFS-8327. (Zhe Zhang via jing9) + + HDFS-8368. Erasure Coding: DFS opening a non-existent file need to be + handled properly (Rakesh R via zhz) + + HDFS-8363. Erasure Coding: DFSStripedInputStream#seekToNewSource. (yliu) + + HDFS-8195. Erasure coding: Fix file quota change when we complete/commit + the striped blocks. (Takuya Fukudome via zhz) + + HDFS-8364. Erasure coding: fix some minor bugs in EC CLI + (Walter Su via vinayakumarb) + + HDFS-8391. NN should consider current EC tasks handling count from DN while + assigning new tasks. (umamahesh) + + HDFS-8367. BlockInfoStriped uses EC schema. (Kai Sasaki via Kai Zheng) + + HDFS-8352. Erasure Coding: test webhdfs read write stripe file. (waltersu4549) + + HDFS-8417. Erasure Coding: Pread failed to read data starting from not-first stripe. + (Walter Su via jing9) + + HDFS-8418. Fix the isNeededReplication calculation for Striped block in NN. + (Yi Liu via jing9) + + HDFS-8320. Erasure coding: consolidate striping-related terminologies. (zhz) + + HDFS-8366. Erasure Coding: Make the timeout parameter of polling blocking queue + configurable in DFSStripedOutputStream. (Li Bo) + + HDFS-8378. Erasure Coding: Few improvements for the erasure coding worker. + (Rakesh R via waltersu4549) + + HDFS-8375. Add cellSize as an XAttr to ECZone. ( Vinayakumar B via zhz). + + HDFS-8428. Erasure Coding: Fix the NullPointerException when deleting file. + (Yi Liu via zhz). + + HDFS-8323. Bump GenerationStamp for write faliure in DFSStripedOutputStream. + (Tsz Wo Nicholas Sze via jing9) + + HDFS-8427. Remove dataBlockNum and parityBlockNum from BlockInfoStriped. + (Kai Sasaki via jing9) + + HDFS-8186. Erasure coding: Make block placement policy for EC file configurable. + (Walter Su via zhz) + + HDFS-8294. Erasure Coding: Fix Findbug warnings present in erasure coding. + (Rakesh R via zhz) + + HDFS-8441. Erasure Coding: make condition check earlier for setReplication. + (waltersu4549) + + HDFS-7768. Change fsck to support EC files. (Takanobu Asanuma via szetszwo) + + HDFS-8382. Remove chunkSize and initialize from erasure coder. (Kai Zheng) + + HDFS-8408. Revisit and refactor ErasureCodingInfo (vinayakumarb) + + HDFS-8479. Erasure coding: fix striping related logic in FSDirWriteFileOp to + sync with HDFS-8421. (Zhe Zhang via jing9) + + HDFS-8481. Erasure coding: remove workarounds in client side stripped blocks + recovering. (zhz) + + HDFS-8336. Expose some administrative erasure coding operations to HdfsAdmin + (Uma Maheswara Rao G via vinayakumarb) + + HDFS-8444. Erasure Coding: fix cannot rename a zone dir + (Walter Su via vinayakumarb) + + HDFS-8517. Fix a decoding issue in stripped block recovering in client side. + (Kai Zheng via jing9) + + HDFS-8453. Erasure coding: properly handle start offset for internal blocks + in a block group. (Zhe Zhang via jing9) + + HDFS-7621. Erasure Coding: update the Balancer/Mover data migration logic. + (Walter Su via zhz) + + HDFS-8328. Follow-on to update decode for DataNode striped blocks + reconstruction. (yliu) + + HDFS-8319. Erasure Coding: support decoding for stateful read. + (Jing Zhao via zhz) + + HDFS-8460. Erasure Coding: stateful read result doesn't match data + occasionally because of flawed test. (Walter Su via zhz) + + HDFS-8556. Erasure Coding: Fix usage of 'createZone' (vinayakumarb) + + HDFS-8571. Fix TestErasureCodingCli test (Vinayakumar B via waltersu4549) + + HDFS-8450. Erasure Coding: Consolidate erasure coding zone related + implementation into a single class (Rakesh R via vinayakumarb) + + HDFS-8585. Erasure Coding: Remove dataBlockNum and parityBlockNum from + StripedBlockProto. (Yi Liu via jing9) + + HDFS-8559. Erasure Coding: fix non-protobuf fsimage for striped blocks. + (Jing Zhao via yliu) + + HDFS-8580. Erasure coding: Persist cellSize in BlockInfoStriped and + StripedBlocksFeature. (Walter Su via jing9) + + HDFS-8466. Refactor BlockInfoContiguous and fix NPE in + TestBlockInfo#testCopyConstructor() (vinayakumarb) + + HDFS-8254. Avoid assigning a leading streamer in StripedDataStreamer to + tolerate datanode failure. (Tsz Wo Nicholas Sze via jing9) + + HDFS-8543. Erasure Coding: processOverReplicatedBlock() handles striped block. + (Walter Su via jing9) + + HDFS-8602. Erasure Coding: Client can't read(decode) the EC files which have + corrupt blocks. (jing9 and Kai Sasaki) + + HDFS-8567. Erasure Coding: SafeMode handles file smaller than a full stripe. + (Walter Su via jing9) + + HDFS-8253. DFSStripedOutputStream.closeThreads releases cellBuffers + multiple times. (Kai Sasaki via szetszwo) + + HDFS-8468. 2 RPC calls for every file read in DFSClient#open(..) resulting in + double Audit log entries (vinayakumarb) + + HDFS-8684. Erasure Coding: fix some block number calculation for striped + block. (yliu) + + HDFS-8461. Erasure coding: fix priority level of UnderReplicatedBlocks for + striped block. (Walter Su via jing9) + + HDFS-8719. Erasure Coding: client generates too many small packets when + writing parity data. (Li Bo via waltersu4549) + + HDFS-8563. Erasure Coding: fsck handles file smaller than a full stripe. + (Walter Su via jing9) + + HDFS-8484. Erasure coding: Two contiguous blocks occupy IDs belong to same + striped group. (Walter Su via jing9) + + HDFS-8744. Erasure Coding: the number of chunks in packet is not updated + when writing parity data. (Li Bo) + + HDFS-8669. Erasure Coding: handle missing internal block locations in + DFSStripedInputStream. (jing9) + + HDFS-8702. Erasure coding: update BlockManager.blockHasEnoughRacks(..) logic + for striped block. (Kai Sasaki via jing9) + + HDFS-8734. Erasure Coding: fix one cell need two packets. (Walter Su via + jing9) + + HDFS-8619. Erasure Coding: revisit replica counting for striped blocks. + (Jing Zhao via yliu) + + HDFS-8058. Erasure coding: use BlockInfo[] for both striped and contiguous + blocks in INodeFile. (Zhe Zhang and Yi Liu via zhz) + + HDFS-8787. Erasure coding: rename BlockInfoContiguousUC and BlockInfoStripedUC + to be consistent with trunk. (zhz) + + HDFS-8433. Erasure coding: set blockToken in LocatedStripedBlock.(waltersu4549) + + HDFS-8760. Erasure Coding: reuse BlockReader when reading the same block in pread. + (jing9) + + HDFS-8781. Erasure Coding: Correctly handle BlockManager#InvalidateBlocks for + striped block. (Yi Liu via jing9) + + HDFS-8813. Erasure Coding: Client no need to decode missing parity blocks. + (Walter Su via jing9) + + HDFS-8798. Erasure Coding: fix DFSStripedInputStream/DFSStripedOutputStream + re-fetch token when expired. (Walter Su via jing9) + + HDFS-8769. Erasure coding: unit test for SequentialBlockGroupIdGenerator. + (Rakesh R via waltersu4549) + + HDFS-8202. Improve end to end stirpping file test to add erasure recovering + test. (Xinwei Qin via zhz) + + HDFS-8804. Erasure Coding: use DirectBufferPool in DFSStripedInputStream for + buffer allocation. (jing9) + + HDFS-8399. Erasure Coding: unit test the behaviour of BlockManager recovery + work for the deleted blocks. (Rakesh R via zhz) + + HDFS-8857. Erasure Coding: Fix ArrayIndexOutOfBoundsException in + TestWriteStripedFileWithFailure. (Li Bo) + + HDFS-8827. Erasure Coding: Fix NPE when NameNode processes over-replicated + striped blocks. (Walter Su and Takuya Fukudome via jing9) http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs index 852b040..20c1302 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs @@ -34,6 +34,7 @@ function hadoop_usage hadoop_add_subcommand "debug" "run a Debug Admin to execute HDFS debug commands" hadoop_add_subcommand "dfs" "run a filesystem command on the file system" hadoop_add_subcommand "dfsadmin" "run a DFS admin client" + hadoop_add_subcommand "erasurecode" "configure HDFS erasure coding zones" hadoop_add_subcommand "fetchdt" "fetch a delegation token from the NameNode" hadoop_add_subcommand "fsck" "run a DFS filesystem checking utility" hadoop_add_subcommand "getconf" "get config values from configuration" @@ -133,6 +134,11 @@ case ${COMMAND} in hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS" HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}" ;; + erasurecode) + CLASS=org.apache.hadoop.hdfs.tools.erasurecode.ECCli + hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS" + HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}" + ;; fetchdt) CLASS=org.apache.hadoop.hdfs.tools.DelegationTokenFetcher ;; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java index aa3e8ba..8f988af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs; +import java.io.Closeable; import java.io.IOException; import java.util.EnumSet; @@ -24,13 +25,14 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.util.DataChecksum; /** * A BlockReader is responsible for reading a single block * from a single datanode. */ @InterfaceAudience.Private -public interface BlockReader extends ByteBufferReadable { +public interface BlockReader extends ByteBufferReadable, Closeable { /* same interface as inputStream java.io.InputStream#read() @@ -62,6 +64,7 @@ public interface BlockReader extends ByteBufferReadable { * * @throws IOException */ + @Override // java.io.Closeable void close() throws IOException; /** @@ -99,4 +102,9 @@ public interface BlockReader extends ByteBufferReadable { * supported. */ ClientMmap getClientMmap(EnumSet<ReadOption> opts); + + /** + * @return The DataChecksum used by the read block + */ + DataChecksum getDataChecksum(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index d913f3a..0b2420d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -738,4 +738,9 @@ class BlockReaderLocal implements BlockReader { void forceUnanchorable() { replica.getSlot().makeUnanchorable(); } + + @Override + public DataChecksum getDataChecksum() { + return checksum; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java index c16ffdf..04cf733 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java @@ -732,4 +732,9 @@ class BlockReaderLocalLegacy implements BlockReader { public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { return null; } + + @Override + public DataChecksum getDataChecksum() { + return checksum; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 3f4621e..8bf1444 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -118,6 +118,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -163,6 +164,7 @@ import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; @@ -237,6 +239,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, private static final DFSHedgedReadMetrics HEDGED_READ_METRIC = new DFSHedgedReadMetrics(); private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; + private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL; private final Sampler<?> traceSampler; private final int smallBufferSize; @@ -374,8 +377,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, dfsClientConf); if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) { - this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize()); + this.initThreadsNumForHedgedReads(dfsClientConf. + getHedgedReadThreadpoolSize()); } + + this.initThreadsNumForStripedReads(dfsClientConf. + getStripedReadThreadpoolSize()); this.saslClient = new SaslDataTransferClient( conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); @@ -1167,7 +1174,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, // Get block info from namenode TraceScope scope = getPathTraceScope("newDFSInputStream", src); try { - return new DFSInputStream(this, src, verifyChecksum, null); + LocatedBlocks locatedBlocks = getLocatedBlocks(src, 0); + if (locatedBlocks != null) { + ECSchema schema = locatedBlocks.getECSchema(); + if (schema != null) { + return new DFSStripedInputStream(this, src, verifyChecksum, schema, + locatedBlocks.getStripeCellSize(), locatedBlocks); + } + return new DFSInputStream(this, src, verifyChecksum, locatedBlocks); + } else { + throw new IOException("Cannot open filename " + src); + } } finally { scope.close(); } @@ -1299,7 +1316,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, Progressable progress, int buffersize, ChecksumOpt checksumOpt) throws IOException { - return create(src, permission, flag, createParent, replication, blockSize, + return create(src, permission, flag, createParent, replication, blockSize, progress, buffersize, checksumOpt, null); } @@ -2995,6 +3012,21 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, return new EncryptionZoneIterator(namenode, traceSampler); } + public void createErasureCodingZone(String src, ECSchema schema, int cellSize) + throws IOException { + checkOpen(); + TraceScope scope = getPathTraceScope("createErasureCodingZone", src); + try { + namenode.createErasureCodingZone(src, schema, cellSize); + } catch (RemoteException re) { + throw re.unwrapRemoteException(AccessControlException.class, + SafeModeException.class, + UnresolvedPathException.class); + } finally { + scope.close(); + } + } + public void setXAttr(String src, String name, byte[] value, EnumSet<XAttrSetFlag> flag) throws IOException { checkOpen(); @@ -3107,6 +3139,16 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } } + public ECSchema[] getECSchemas() throws IOException { + checkOpen(); + TraceScope scope = Trace.startSpan("getECSchemas", traceSampler); + try { + return namenode.getECSchemas(); + } finally { + scope.close(); + } + } + public DFSInotifyEventInputStream getInotifyEventStream() throws IOException { checkOpen(); return new DFSInotifyEventInputStream(traceSampler, namenode); @@ -3181,10 +3223,51 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } } + /** + * Create thread pool for parallel reading in striped layout, + * STRIPED_READ_THREAD_POOL, if it does not already exist. + * @param num Number of threads for striped reads thread pool. + */ + private void initThreadsNumForStripedReads(int num) { + assert num > 0; + if (STRIPED_READ_THREAD_POOL != null) { + return; + } + synchronized (DFSClient.class) { + if (STRIPED_READ_THREAD_POOL == null) { + STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, + TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), + new Daemon.DaemonFactory() { + private final AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName("stripedRead-" + threadIndex.getAndIncrement()); + return t; + } + }, new ThreadPoolExecutor.CallerRunsPolicy() { + @Override + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { + LOG.info("Execution for striped reading rejected, " + + "Executing in current thread"); + // will run in the current thread + super.rejectedExecution(runnable, e); + } + }); + STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); + } + } + } + ThreadPoolExecutor getHedgedReadsThreadPool() { return HEDGED_READ_THREAD_POOL; } + ThreadPoolExecutor getStripedReadsThreadPool() { + return STRIPED_READ_THREAD_POOL; + } + boolean isHedgedReadsEnabled() { return (HEDGED_READ_THREAD_POOL != null) && HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0; @@ -3249,4 +3332,24 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } return scope; } + + /** + * Get the erasure coding zone information for the specified path + * + * @param src path to get the information for + * @return Returns the zone information if path is in EC Zone, null otherwise + * @throws IOException + */ + public ErasureCodingZone getErasureCodingZone(String src) throws IOException { + checkOpen(); + TraceScope scope = getPathTraceScope("getErasureCodingZone", src); + try { + return namenode.getErasureCodingZone(src); + } catch (RemoteException re) { + throw re.unwrapRemoteException(FileNotFoundException.class, + AccessControlException.class, UnresolvedPathException.class); + } finally { + scope.close(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 4ef7a4d..59f14d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolarent; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; import org.apache.hadoop.http.HttpConfig; @@ -160,6 +161,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT = 3; public static final String DFS_NAMENODE_REPLICATION_MIN_KEY = "dfs.namenode.replication.min"; public static final int DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1; + public static final String DFS_NAMENODE_STRIPE_MIN_KEY = "dfs.namenode.stripe.min"; + public static final int DFS_NAMENODE_STRIPE_MIN_DEFAULT = 1; public static final String DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY = "dfs.namenode.replication.pending.timeout-sec"; public static final int DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT = -1; public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = "dfs.namenode.replication.max-streams"; @@ -371,6 +374,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600; public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads"; public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1; + public static final String DFS_DATANODE_STRIPED_READ_THREADS_KEY = "dfs.datanode.stripedread.threads"; + public static final int DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT = 20; + public static final String DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size"; + public static final int DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 64 * 1024; + public static final String DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY = "dfs.datanode.stripedread.threshold.millis"; + public static final int DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface"; public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default"; public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver"; @@ -430,6 +439,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final Class<BlockPlacementPolicyDefault> DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class; public static final String DFS_REPLICATION_MAX_KEY = "dfs.replication.max"; public static final int DFS_REPLICATION_MAX_DEFAULT = 512; + public static final String DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY = "dfs.block.placement.ec.classname"; + public static final Class<BlockPlacementPolicyRackFaultTolarent> DFS_BLOCK_PLACEMENT_EC_CLASSNAME_DEFAULT = BlockPlacementPolicyRackFaultTolarent.class; public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval"; public static final int DFS_DF_INTERVAL_DEFAULT = 60000; @@ -679,7 +690,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.namenode.reject-unresolved-dn-topology-mapping"; public static final boolean DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT = false; - + + public static final String DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE = + "dfs.client.striped.read.threadpool.size"; + // With default 3+2 schema, each normal read could span 3 DNs. So this + // default value accommodates 6 read streams + public static final int DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE = 18; + // Slow io warning log threshold settings for dfsclient and datanode. public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY = "dfs.datanode.slow.io.warning.threshold.ms"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 7f3722f..7d83db6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -1140,41 +1140,25 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } /** - * Used when reading contiguous blocks - */ - private void actualGetFromOneDataNode(final DNAddrPair datanode, - LocatedBlock block, final long start, final long end, byte[] buf, - int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) - throws IOException { - final int length = (int) (end - start + 1); - actualGetFromOneDataNode(datanode, block, start, end, buf, - new int[]{offset}, new int[]{length}, corruptedBlockMap); - } - - /** * Read data from one DataNode. - * @param datanode the datanode from which to read data - * @param block the located block containing the requested data - * @param startInBlk the startInBlk offset of the block - * @param endInBlk the endInBlk offset of the block - * @param buf the given byte array into which the data is read - * @param offsets the data may be read into multiple segments of the buf - * (when reading a striped block). this array indicates the - * offset of each buf segment. - * @param lengths the length of each buf segment + * + * @param datanode the datanode from which to read data + * @param block the located block containing the requested data + * @param startInBlk the startInBlk offset of the block + * @param endInBlk the endInBlk offset of the block + * @param buf the given byte array into which the data is read + * @param offset the offset in buf * @param corruptedBlockMap map recording list of datanodes with corrupted * block replica */ - void actualGetFromOneDataNode(final DNAddrPair datanode, - LocatedBlock block, final long startInBlk, final long endInBlk, - byte[] buf, int[] offsets, int[] lengths, + void actualGetFromOneDataNode(final DNAddrPair datanode, LocatedBlock block, + final long startInBlk, final long endInBlk, byte[] buf, int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) throws IOException { DFSClientFaultInjector.get().startFetchFromDatanode(); int refetchToken = 1; // only need to get a new access token once int refetchEncryptionKey = 1; // only need to get a new encryption key once final int len = (int) (endInBlk - startInBlk + 1); - checkReadPortions(offsets, lengths, len); while (true) { // cached block locations may have been updated by chooseDataNode() @@ -1186,13 +1170,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, DFSClientFaultInjector.get().fetchFromDatanodeException(); reader = getBlockReader(block, startInBlk, len, datanode.addr, datanode.storageType, datanode.info); - for (int i = 0; i < offsets.length; i++) { - int nread = reader.readAll(buf, offsets[i], lengths[i]); - updateReadStatistics(readStatistics, nread, reader); - if (nread != lengths[i]) { - throw new IOException("truncated return from reader.read(): " + - "excpected " + lengths[i] + ", got " + nread); - } + int nread = reader.readAll(buf, offset, len); + updateReadStatistics(readStatistics, nread, reader); + if (nread != len) { + throw new IOException("truncated return from reader.read(): " + + "excpected " + len + ", got " + nread); } DFSClientFaultInjector.get().readFromDatanodeDelay(); return; @@ -1248,24 +1230,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } /** - * This method verifies that the read portions are valid and do not overlap - * with each other. - */ - private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) { - Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0); - int sum = 0; - for (int i = 0; i < lengths.length; i++) { - if (i > 0) { - int gap = offsets[i] - offsets[i - 1]; - // make sure read portions do not overlap with each other - Preconditions.checkArgument(gap >= lengths[i - 1]); - } - sum += lengths[i]; - } - Preconditions.checkArgument(sum == totalLen); - } - - /** * Like {@link #fetchBlockByteRange}except we start up a second, parallel, * 'hedged' read if the first read is taking longer than configured amount of * time. We then wait on which ever read returns first. @@ -1485,7 +1449,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, long targetStart = position - blk.getStartOffset(); long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); try { - if (dfsClient.isHedgedReadsEnabled()) { + if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) { hedgedFetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap); } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index c16aef2..404bbfc 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -24,6 +24,8 @@ import java.nio.channels.ClosedChannelException; import java.util.EnumSet; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.crypto.CryptoProtocolVersion; @@ -110,7 +112,7 @@ public class DFSOutputStream extends FSOutputSummer protected final int bytesPerChecksum; protected DFSPacket currentPacket = null; - private DataStreamer streamer; + protected DataStreamer streamer; protected int packetSize = 0; // write packet size, not including the header. protected int chunksPerPacket = 0; protected long lastFlushOffset = 0; // offset when flush was invoked @@ -269,8 +271,14 @@ public class DFSOutputStream extends FSOutputSummer } } Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); - final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, - flag, progress, checksum, favoredNodes); + final DFSOutputStream out; + if(stat.getECSchema() != null) { + out = new DFSStripedOutputStream(dfsClient, src, stat, + flag, progress, checksum, favoredNodes); + } else { + out = new DFSOutputStream(dfsClient, src, stat, + flag, progress, checksum, favoredNodes); + } out.start(); return out; } finally { @@ -350,6 +358,9 @@ public class DFSOutputStream extends FSOutputSummer String[] favoredNodes) throws IOException { TraceScope scope = dfsClient.getPathTraceScope("newStreamForAppend", src); + if(stat.getReplication() == 0) { + throw new IOException("Not support appending to a striping layout file yet."); + } try { final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags, progress, lastBlock, stat, checksum, favoredNodes); @@ -413,7 +424,6 @@ public class DFSOutputStream extends FSOutputSummer getStreamer().incBytesCurBlock(len); // If packet is full, enqueue it for transmission - // if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || getStreamer().getBytesCurBlock() == blockSize) { enqueueCurrentPacketFull(); @@ -907,4 +917,9 @@ public class DFSOutputStream extends FSOutputSummer protected DataStreamer getStreamer() { return streamer; } + + @Override + public String toString() { + return getClass().getSimpleName() + ":" + streamer; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java index 22055c3..a26e35e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs; import java.io.DataOutputStream; import java.io.IOException; import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.Arrays; @@ -36,7 +37,7 @@ import org.apache.htrace.Span; ****************************************************************/ @InterfaceAudience.Private -class DFSPacket { +public class DFSPacket { public static final long HEART_BEAT_SEQNO = -1L; private static long[] EMPTY = new long[0]; private final long seqno; // sequence number of buffer in block @@ -79,7 +80,7 @@ class DFSPacket { * @param checksumSize the size of checksum * @param lastPacketInBlock if this is the last packet */ - DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno, + public DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno, int checksumSize, boolean lastPacketInBlock) { this.lastPacketInBlock = lastPacketInBlock; this.numChunks = 0; @@ -113,6 +114,19 @@ class DFSPacket { dataPos += len; } + public synchronized void writeData(ByteBuffer inBuffer, int len) + throws ClosedChannelException { + checkBuffer(); + len = len > inBuffer.remaining() ? inBuffer.remaining() : len; + if (dataPos + len > buf.length) { + throw new BufferOverflowException(); + } + for (int i = 0; i < len; i++) { + buf[dataPos + i] = inBuffer.get(); + } + dataPos += len; + } + /** * Write checksums to this packet * @@ -121,7 +135,7 @@ class DFSPacket { * @param len the length of checksums to write * @throws ClosedChannelException */ - synchronized void writeChecksum(byte[] inarray, int off, int len) + public synchronized void writeChecksum(byte[] inarray, int off, int len) throws ClosedChannelException { checkBuffer(); if (len == 0) { @@ -140,7 +154,7 @@ class DFSPacket { * @param stm * @throws IOException */ - synchronized void writeTo(DataOutputStream stm) throws IOException { + public synchronized void writeTo(DataOutputStream stm) throws IOException { checkBuffer(); final int dataLen = dataPos - dataStart; @@ -222,7 +236,7 @@ class DFSPacket { * * @return true if the packet is the last packet */ - boolean isLastPacketInBlock(){ + boolean isLastPacketInBlock() { return lastPacketInBlock; } @@ -231,7 +245,7 @@ class DFSPacket { * * @return the sequence number of this packet */ - long getSeqno(){ + long getSeqno() { return seqno; } @@ -240,14 +254,14 @@ class DFSPacket { * * @return the number of chunks in this packet */ - synchronized int getNumChunks(){ + synchronized int getNumChunks() { return numChunks; } /** * increase the number of chunks by one */ - synchronized void incNumChunks(){ + synchronized void incNumChunks() { numChunks++; } @@ -256,7 +270,7 @@ class DFSPacket { * * @return the maximum number of packets */ - int getMaxChunks(){ + int getMaxChunks() { return maxChunks; } @@ -265,7 +279,7 @@ class DFSPacket { * * @param syncBlock if to sync block */ - synchronized void setSyncBlock(boolean syncBlock){ + synchronized void setSyncBlock(boolean syncBlock) { this.syncBlock = syncBlock; }
