Merge remote-tracking branch 'apache/trunk' into HDFS-7285
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ab56fcdb Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ab56fcdb Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ab56fcdb Branch: refs/heads/HDFS-7285 Commit: ab56fcdb1219d03713b408dd3a95d7405635254d Parents: 164cbe6 cbb2495 Author: Zhe Zhang <zhezh...@cloudera.com> Authored: Thu Aug 27 16:23:41 2015 -0700 Committer: Zhe Zhang <zhezh...@cloudera.com> Committed: Tue Sep 1 14:30:25 2015 -0700 ---------------------------------------------------------------------- .../server/AuthenticationFilter.java | 63 +- .../server/AuthenticationToken.java | 12 + .../security/authentication/util/AuthToken.java | 35 +- .../server/TestAuthenticationFilter.java | 163 ++- hadoop-common-project/hadoop-common/CHANGES.txt | 34 + .../src/main/conf/log4j.properties | 13 + .../fs/CommonConfigurationKeysPublic.java | 5 + .../java/org/apache/hadoop/fs/CreateFlag.java | 2 +- .../apache/hadoop/fs/TrashPolicyDefault.java | 11 +- .../apache/hadoop/ipc/ProtobufRpcEngine.java | 5 +- .../main/java/org/apache/hadoop/ipc/Server.java | 60 + .../apache/hadoop/ipc/WritableRpcEngine.java | 3 + .../apache/hadoop/ipc/metrics/RpcMetrics.java | 48 + .../apache/hadoop/metrics2/lib/MutableStat.java | 7 +- .../org/apache/hadoop/metrics2/util/MBeans.java | 37 +- .../org/apache/hadoop/util/HostsFileReader.java | 7 +- .../main/java/org/apache/hadoop/util/Shell.java | 11 +- .../org/apache/hadoop/util/StringUtils.java | 29 +- .../src/main/resources/core-default.xml | 9 + .../src/site/markdown/HttpAuthentication.md | 8 +- .../hadoop-common/src/site/markdown/Metrics.md | 2 + .../src/site/markdown/SingleCluster.md.vm | 2 +- .../org/apache/hadoop/ipc/TestProtoBufRpc.java | 77 +- .../org/apache/hadoop/test/MetricsAsserts.java | 2 +- .../java/org/apache/hadoop/util/TestShell.java | 39 + .../hadoop-common/src/test/proto/test.proto | 7 + .../src/test/proto/test_rpc_service.proto | 1 + .../dev-support/findbugsExcludeFile.xml | 10 + .../org/apache/hadoop/hdfs/DFSUtilClient.java | 26 + .../org/apache/hadoop/hdfs/ExtendedBlockId.java | 82 ++ .../org/apache/hadoop/hdfs/ReplicaAccessor.java | 88 ++ .../hadoop/hdfs/ReplicaAccessorBuilder.java | 101 ++ .../hdfs/client/HdfsClientConfigKeys.java | 76 +- .../hadoop/hdfs/client/impl/DfsClientConf.java | 794 +++++++++++++ .../hadoop/hdfs/client/impl/package-info.java | 18 + .../org/apache/hadoop/hdfs/net/DomainPeer.java | 132 +++ .../java/org/apache/hadoop/hdfs/net/Peer.java | 123 ++ .../hadoop/hdfs/protocol/HdfsConstants.java | 7 + .../datatransfer/BlockConstructionStage.java | 62 + .../datatransfer/DataTransferProtoUtil.java | 146 +++ .../datatransfer/DataTransferProtocol.java | 202 ++++ .../hadoop/hdfs/protocol/datatransfer/Op.java | 66 ++ .../hdfs/protocol/datatransfer/Sender.java | 261 +++++ .../hadoop/hdfs/protocolPB/PBHelperClient.java | 254 +++++ .../token/block/InvalidBlockTokenException.java | 41 + .../server/datanode/BlockMetadataHeader.java | 209 ++++ .../hdfs/server/datanode/CachingStrategy.java | 76 ++ .../hadoop/hdfs/shortcircuit/ClientMmap.java | 75 ++ .../hadoop/hdfs/shortcircuit/DfsClientShm.java | 119 ++ .../hdfs/shortcircuit/DfsClientShmManager.java | 522 +++++++++ .../hdfs/shortcircuit/DomainSocketFactory.java | 196 ++++ .../hdfs/shortcircuit/ShortCircuitCache.java | 1066 +++++++++++++++++ .../hdfs/shortcircuit/ShortCircuitReplica.java | 352 ++++++ .../shortcircuit/ShortCircuitReplicaInfo.java | 64 ++ .../hdfs/shortcircuit/ShortCircuitShm.java | 647 +++++++++++ .../hadoop/hdfs/util/ByteArrayManager.java | 422 +++++++ .../hadoop/hdfs/util/ExactSizeInputStream.java | 125 ++ .../apache/hadoop/hdfs/util/IOUtilsClient.java | 46 + .../apache/hadoop/hdfs/util/package-info.java | 18 + .../hadoop/hdfs/web/WebHdfsFileSystem.java | 20 + .../hdfs/web/resources/CreateFlagParam.java | 48 + .../hdfs/web/resources/CreateParentParam.java | 2 +- .../src/main/proto/ClientDatanodeProtocol.proto | 33 - .../src/main/proto/datatransfer.proto | 4 + hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 125 +- .../dev-support/findbugsExcludeFile.xml | 10 - .../hadoop-hdfs/src/CMakeLists.txt | 1 + .../apache/hadoop/fs/BlockStorageLocation.java | 52 - .../java/org/apache/hadoop/fs/HdfsVolumeId.java | 73 -- .../java/org/apache/hadoop/fs/VolumeId.java | 40 - .../apache/hadoop/hdfs/BlockReaderFactory.java | 65 +- .../hadoop/hdfs/BlockStorageLocationUtil.java | 368 ------ .../org/apache/hadoop/hdfs/ClientContext.java | 5 +- .../java/org/apache/hadoop/hdfs/DFSClient.java | 134 +-- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 186 ++- .../org/apache/hadoop/hdfs/DFSOutputStream.java | 3 +- .../org/apache/hadoop/hdfs/DataStreamer.java | 6 +- .../hadoop/hdfs/DistributedFileSystem.java | 39 - .../org/apache/hadoop/hdfs/ExtendedBlockId.java | 82 -- .../apache/hadoop/hdfs/ExternalBlockReader.java | 126 +++ .../apache/hadoop/hdfs/HdfsConfiguration.java | 8 +- .../apache/hadoop/hdfs/RemoteBlockReader.java | 6 +- .../apache/hadoop/hdfs/RemoteBlockReader2.java | 6 +- .../org/apache/hadoop/hdfs/XAttrHelper.java | 13 +- .../hadoop/hdfs/client/impl/DfsClientConf.java | 765 ------------- .../org/apache/hadoop/hdfs/net/DomainPeer.java | 132 --- .../java/org/apache/hadoop/hdfs/net/Peer.java | 123 -- .../hdfs/protocol/ClientDatanodeProtocol.java | 19 - .../hdfs/protocol/HdfsBlocksMetadata.java | 111 -- .../datatransfer/BlockConstructionStage.java | 62 - .../datatransfer/DataTransferProtoUtil.java | 148 --- .../datatransfer/DataTransferProtocol.java | 201 ---- .../hadoop/hdfs/protocol/datatransfer/Op.java | 65 -- .../hdfs/protocol/datatransfer/PipelineAck.java | 2 +- .../hdfs/protocol/datatransfer/Receiver.java | 8 +- .../hdfs/protocol/datatransfer/Sender.java | 261 ----- .../datatransfer/sasl/DataTransferSaslUtil.java | 2 +- ...tDatanodeProtocolServerSideTranslatorPB.java | 43 +- .../ClientDatanodeProtocolTranslatorPB.java | 49 +- ...tNamenodeProtocolServerSideTranslatorPB.java | 6 +- .../ClientNamenodeProtocolTranslatorPB.java | 28 +- .../DatanodeProtocolClientSideTranslatorPB.java | 4 +- .../InterDatanodeProtocolTranslatorPB.java | 2 +- .../NamenodeProtocolTranslatorPB.java | 2 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 237 +--- .../token/block/InvalidBlockTokenException.java | 41 - .../hadoop/hdfs/server/balancer/Balancer.java | 133 ++- .../hadoop/hdfs/server/balancer/Dispatcher.java | 46 +- .../server/blockmanagement/BlockCollection.java | 9 +- .../hdfs/server/blockmanagement/BlockInfo.java | 59 +- .../blockmanagement/BlockInfoContiguous.java | 11 - .../blockmanagement/BlockInfoStriped.java | 5 - .../server/blockmanagement/BlockManager.java | 115 +- .../BlockPlacementPolicyDefault.java | 177 +-- .../BlockPlacementPolicyWithNodeGroup.java | 35 +- .../BlockStoragePolicySuite.java | 5 +- .../BlockUnderConstructionFeature.java | 31 +- .../hdfs/server/blockmanagement/BlocksMap.java | 14 +- .../blockmanagement/DatanodeDescriptor.java | 26 +- .../server/blockmanagement/DatanodeManager.java | 3 +- .../blockmanagement/DecommissionManager.java | 21 +- .../SequentialBlockGroupIdGenerator.java | 5 +- .../SequentialBlockIdGenerator.java | 5 +- .../hdfs/server/common/HdfsServerConstants.java | 6 - .../server/datanode/BlockMetadataHeader.java | 211 ---- .../hdfs/server/datanode/CachingStrategy.java | 76 -- .../hadoop/hdfs/server/datanode/DNConf.java | 17 +- .../hadoop/hdfs/server/datanode/DataNode.java | 44 +- .../hdfs/server/datanode/DataXceiver.java | 23 +- .../server/datanode/SecureDataNodeStarter.java | 4 +- .../server/datanode/fsdataset/FsDatasetSpi.java | 13 - .../datanode/fsdataset/impl/BlockPoolSlice.java | 2 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 64 +- .../impl/RamDiskAsyncLazyPersistService.java | 8 +- .../datanode/web/webhdfs/ParameterParser.java | 14 + .../datanode/web/webhdfs/WebHdfsHandler.java | 23 +- .../hdfs/server/namenode/BackupImage.java | 8 +- .../namenode/ErasureCodingZoneManager.java | 2 +- .../hdfs/server/namenode/FSDirAppendOp.java | 2 +- .../hdfs/server/namenode/FSDirAttrOp.java | 49 +- .../hdfs/server/namenode/FSDirConcatOp.java | 4 +- .../hdfs/server/namenode/FSDirDeleteOp.java | 5 +- .../hdfs/server/namenode/FSDirRenameOp.java | 7 +- .../hdfs/server/namenode/FSDirSnapshotOp.java | 2 + .../hdfs/server/namenode/FSDirTruncateOp.java | 2 +- .../hdfs/server/namenode/FSDirWriteFileOp.java | 3 +- .../hdfs/server/namenode/FSDirXAttrOp.java | 29 +- .../hdfs/server/namenode/FSDirectory.java | 77 +- .../hadoop/hdfs/server/namenode/FSEditLog.java | 11 + .../hdfs/server/namenode/FSEditLogLoader.java | 8 +- .../hadoop/hdfs/server/namenode/FSImage.java | 153 ++- .../server/namenode/FSImageFormatPBINode.java | 11 +- .../server/namenode/FSImageSerialization.java | 4 +- .../hdfs/server/namenode/FSNamesystem.java | 70 +- .../namenode/FileUnderConstructionFeature.java | 2 +- .../hadoop/hdfs/server/namenode/INode.java | 52 +- .../hdfs/server/namenode/INodeDirectory.java | 11 +- .../hadoop/hdfs/server/namenode/INodeFile.java | 34 +- .../hadoop/hdfs/server/namenode/INodeId.java | 1 + .../hadoop/hdfs/server/namenode/NameNode.java | 176 ++- .../hdfs/server/namenode/NamenodeFsck.java | 20 +- .../hadoop/hdfs/server/namenode/Namesystem.java | 7 +- .../hdfs/server/namenode/QuotaCounts.java | 10 +- .../server/namenode/SerialNumberManager.java | 44 - .../hdfs/server/namenode/SerialNumberMap.java | 79 ++ .../hdfs/server/namenode/XAttrFeature.java | 78 +- .../hdfs/server/namenode/XAttrFormat.java | 161 +++ .../server/namenode/XAttrPermissionFilter.java | 6 +- .../hdfs/server/namenode/XAttrStorage.java | 62 +- .../namenode/metrics/FSNamesystemMBean.java | 10 + .../snapshot/FSImageFormatPBSnapshot.java | 10 +- .../snapshot/FileWithSnapshotFeature.java | 44 +- .../web/resources/NamenodeWebHdfsMethods.java | 20 +- .../hdfs/server/protocol/NamespaceInfo.java | 17 +- .../hadoop/hdfs/shortcircuit/ClientMmap.java | 75 -- .../hadoop/hdfs/shortcircuit/DfsClientShm.java | 119 -- .../hdfs/shortcircuit/DfsClientShmManager.java | 514 --------- .../hdfs/shortcircuit/DomainSocketFactory.java | 194 ---- .../hdfs/shortcircuit/ShortCircuitCache.java | 1068 ------------------ .../hdfs/shortcircuit/ShortCircuitReplica.java | 349 ------ .../shortcircuit/ShortCircuitReplicaInfo.java | 64 -- .../hdfs/shortcircuit/ShortCircuitShm.java | 646 ----------- .../org/apache/hadoop/hdfs/tools/DFSAdmin.java | 5 + .../hadoop/hdfs/util/ByteArrayManager.java | 418 ------- .../hadoop/hdfs/util/ExactSizeInputStream.java | 125 -- .../hadoop/hdfs/util/LightWeightHashSet.java | 21 +- .../org/apache/hadoop/hdfs/web/JsonUtil.java | 6 +- .../src/main/resources/hdfs-default.xml | 50 +- .../hadoop-hdfs/src/site/markdown/HdfsDesign.md | 5 +- .../hadoop-hdfs/src/site/markdown/WebHDFS.md | 21 +- .../org/apache/hadoop/hdfs/TestFiPipelines.java | 9 +- .../datanode/TestFiDataTransferProtocol.java | 3 +- .../datanode/TestFiDataTransferProtocol2.java | 5 +- .../hadoop/fs/TestEnhancedByteBufferAccess.java | 10 +- .../java/org/apache/hadoop/fs/TestUnbuffer.java | 7 +- .../java/org/apache/hadoop/fs/TestVolumeId.java | 146 --- .../fs/viewfs/TestViewFsDefaultValue.java | 8 +- .../apache/hadoop/hdfs/BlockReaderTestUtil.java | 6 +- .../org/apache/hadoop/hdfs/FileAppendTest4.java | 5 +- .../org/apache/hadoop/hdfs/MiniDFSCluster.java | 1 + .../hadoop/hdfs/TestAppendSnapshotTruncate.java | 13 +- .../hadoop/hdfs/TestBlockReaderFactory.java | 4 +- .../hadoop/hdfs/TestBlockReaderLocal.java | 4 +- .../hadoop/hdfs/TestBlockReaderLocalLegacy.java | 6 +- .../TestClientProtocolForPipelineRecovery.java | 4 +- .../org/apache/hadoop/hdfs/TestConnCache.java | 5 +- .../hadoop/hdfs/TestDFSClientRetries.java | 8 +- .../hdfs/TestDFSInotifyEventInputStream.java | 2 +- .../apache/hadoop/hdfs/TestDFSInputStream.java | 2 +- .../org/apache/hadoop/hdfs/TestDFSUpgrade.java | 78 +- .../hadoop/hdfs/TestDFSUpgradeFromImage.java | 107 +- .../hadoop/hdfs/TestDataTransferKeepalive.java | 8 +- .../hadoop/hdfs/TestDataTransferProtocol.java | 8 +- .../apache/hadoop/hdfs/TestDatanodeDeath.java | 5 +- .../hadoop/hdfs/TestDisableConnCache.java | 3 +- .../hadoop/hdfs/TestDistributedFileSystem.java | 291 +---- .../hadoop/hdfs/TestExternalBlockReader.java | 298 +++++ .../org/apache/hadoop/hdfs/TestFileAppend.java | 4 +- .../org/apache/hadoop/hdfs/TestFileAppend2.java | 6 +- .../org/apache/hadoop/hdfs/TestFileAppend4.java | 5 +- .../apache/hadoop/hdfs/TestFileCreation.java | 165 +-- .../java/org/apache/hadoop/hdfs/TestHFlush.java | 3 +- .../apache/hadoop/hdfs/TestParallelRead.java | 2 +- .../TestParallelShortCircuitLegacyRead.java | 4 +- .../TestParallelShortCircuitReadUnCached.java | 6 +- .../hadoop/hdfs/TestParallelUnixDomainRead.java | 2 +- .../org/apache/hadoop/hdfs/TestPipelines.java | 9 +- .../java/org/apache/hadoop/hdfs/TestPread.java | 6 +- .../java/org/apache/hadoop/hdfs/TestRead.java | 5 +- .../hadoop/hdfs/TestRemoteBlockReader.java | 4 +- .../hdfs/TestReplaceDatanodeOnFailure.java | 4 +- .../hdfs/protocol/TestBlockListAsLongs.java | 9 +- .../hadoop/hdfs/protocolPB/TestPBHelper.java | 20 +- .../hdfs/server/balancer/TestBalancer.java | 166 ++- .../server/blockmanagement/TestBlockInfo.java | 17 +- .../blockmanagement/TestBlockInfoStriped.java | 4 +- .../TestBlockInfoUnderConstruction.java | 80 -- .../blockmanagement/TestBlockManager.java | 18 +- .../TestBlockReportRateLimiting.java | 2 - .../blockmanagement/TestBlockTokenWithDFS.java | 6 +- .../TestBlockUnderConstructionFeature.java | 80 ++ .../blockmanagement/TestPendingReplication.java | 1 - .../blockmanagement/TestReplicationPolicy.java | 94 +- .../server/datanode/SimulatedFSDataset.java | 7 - .../server/datanode/TestBlockReplacement.java | 7 +- .../server/datanode/TestCachingStrategy.java | 18 +- .../datanode/TestDataNodeVolumeFailure.java | 6 +- .../extdataset/ExternalDatasetImpl.java | 6 - .../fsdataset/impl/LazyPersistTestCase.java | 5 +- .../fsdataset/impl/TestDatanodeRestart.java | 7 +- .../TestCommitBlockSynchronization.java | 9 +- .../TestDefaultBlockPlacementPolicy.java | 49 +- .../namenode/TestDiskspaceQuotaUpdate.java | 64 ++ .../namenode/TestFSImageWithSnapshot.java | 4 +- .../server/namenode/TestFSNamesystemMBean.java | 34 +- .../hadoop/hdfs/server/namenode/TestFsck.java | 56 +- .../hdfs/server/namenode/TestINodeFile.java | 7 +- .../namenode/TestNameNodeMetricsLogger.java | 193 ++++ .../hdfs/server/namenode/TestStartup.java | 27 +- .../hdfs/server/namenode/TestXAttrFeature.java | 119 ++ .../namenode/snapshot/SnapshotTestHelper.java | 4 +- .../snapshot/TestFileWithSnapshotFeature.java | 7 +- .../snapshot/TestSnapshotBlocksMap.java | 30 +- .../namenode/snapshot/TestSnapshotDeletion.java | 25 +- .../snapshot/TestSnapshotReplication.java | 31 +- .../shortcircuit/TestShortCircuitCache.java | 16 +- .../shortcircuit/TestShortCircuitLocalRead.java | 106 +- .../hadoop/hdfs/tools/TestDFSAdminWithHA.java | 7 + .../hdfs/util/TestLightWeightHashSet.java | 29 +- .../org/apache/hadoop/hdfs/web/TestWebHDFS.java | 25 + .../src/test/resources/hadoop-252-dfs-dir.tgz | Bin 0 -> 14112 bytes .../src/test/resources/log4j.properties | 13 + hadoop-mapreduce-project/CHANGES.txt | 23 + .../apache/hadoop/mapred/LocalJobRunner.java | 27 + .../mapreduce/lib/output/MultipleOutputs.java | 14 +- .../hadoop/mapred/ResourceMgrDelegate.java | 7 + .../hadoop/mapred/TestClientRedirect.java | 9 + .../hadoop/mapred/TestLocalJobSubmission.java | 25 + hadoop-project/pom.xml | 4 +- .../org/apache/hadoop/tools/CopyListing.java | 15 +- .../java/org/apache/hadoop/tools/DiffInfo.java | 32 +- .../java/org/apache/hadoop/tools/DistCp.java | 27 +- .../org/apache/hadoop/tools/DistCpOptions.java | 4 +- .../org/apache/hadoop/tools/DistCpSync.java | 308 ++++- .../apache/hadoop/tools/SimpleCopyListing.java | 151 ++- .../org/apache/hadoop/tools/TestDistCpSync.java | 345 +++++- .../apache/hadoop/tools/TestOptionsParser.java | 22 +- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 3 + .../yarn/sls/scheduler/RMNodeWrapper.java | 5 + hadoop-yarn-project/CHANGES.txt | 48 +- .../yarn/api/ApplicationClientProtocol.java | 18 + .../UpdateApplicationPriorityRequest.java | 80 ++ .../UpdateApplicationPriorityResponse.java | 47 + .../yarn/api/records/LogAggregationContext.java | 95 ++ .../hadoop/yarn/conf/YarnConfiguration.java | 36 + .../api/ContainerLogAggregationPolicy.java | 54 + .../yarn/server/api/ContainerLogContext.java | 71 ++ .../ResourceManagerAdministrationProtocol.java | 8 + .../RefreshClusterMaxPriorityRequest.java | 35 + .../RefreshClusterMaxPriorityResponse.java | 36 + .../main/proto/applicationclient_protocol.proto | 1 + ...esourcemanager_administration_protocol.proto | 1 + ..._server_resourcemanager_service_protos.proto | 5 + .../src/main/proto/yarn_protos.proto | 2 + .../src/main/proto/yarn_service_protos.proto | 8 + .../hadoop/yarn/client/api/YarnClient.java | 17 + .../yarn/client/api/impl/YarnClientImpl.java | 11 + .../hadoop/yarn/client/cli/ApplicationCLI.java | 29 + .../hadoop/yarn/client/cli/RMAdminCLI.java | 15 + .../hadoop/yarn/client/cli/TestRMAdminCLI.java | 9 + .../hadoop/yarn/client/cli/TestYarnCLI.java | 29 + .../ApplicationClientProtocolPBClientImpl.java | 20 + .../ApplicationClientProtocolPBServiceImpl.java | 22 + .../UpdateApplicationPriorityRequestPBImpl.java | 171 +++ ...UpdateApplicationPriorityResponsePBImpl.java | 69 ++ .../impl/pb/LogAggregationContextPBImpl.java | 40 + .../ContainerLogsRetentionPolicy.java | 29 - .../nodelabels/CommonNodeLabelsManager.java | 2 +- ...nagerAdministrationProtocolPBClientImpl.java | 20 + ...agerAdministrationProtocolPBServiceImpl.java | 23 + .../RefreshClusterMaxPriorityRequestPBImpl.java | 74 ++ ...RefreshClusterMaxPriorityResponsePBImpl.java | 73 ++ .../src/main/resources/yarn-default.xml | 71 ++ .../hadoop-yarn/hadoop-yarn-registry/pom.xml | 4 +- .../hadoop/yarn/server/webapp/AppBlock.java | 6 +- .../yarn/server/nodemanager/NodeManager.java | 39 +- .../nodemanager/NodeStatusUpdaterImpl.java | 259 +++-- .../application/ApplicationImpl.java | 5 +- .../AMOnlyLogAggregationPolicy.java | 31 + ...AMOrFailedContainerLogAggregationPolicy.java | 35 + .../AbstractContainerLogAggregationPolicy.java | 31 + .../AllContainerLogAggregationPolicy.java | 30 + .../logaggregation/AppLogAggregator.java | 5 +- .../logaggregation/AppLogAggregatorImpl.java | 131 ++- .../FailedContainerLogAggregationPolicy.java | 33 + ...edOrKilledContainerLogAggregationPolicy.java | 30 + .../logaggregation/LogAggregationService.java | 19 +- .../NoneContainerLogAggregationPolicy.java | 30 + .../SampleContainerLogAggregationPolicy.java | 124 ++ .../event/LogHandlerAppStartedEvent.java | 15 +- .../monitor/ContainersMonitorImpl.java | 10 +- .../nodelabels/AbstractNodeLabelsProvider.java | 146 +++ .../ConfigurationNodeLabelsProvider.java | 81 ++ .../server/nodemanager/TestNodeManager.java | 50 +- .../TestNodeStatusUpdaterForLabels.java | 76 +- .../containermanager/TestAuxServices.java | 1 + .../TestLogAggregationService.java | 677 +++++++++-- .../TestNonAggregatingLogHandler.java | 12 +- .../TestConfigurationNodeLabelsProvider.java | 146 +++ .../server/resourcemanager/AdminService.java | 28 + .../ApplicationMasterService.java | 3 +- .../server/resourcemanager/ClientRMService.java | 76 ++ .../server/resourcemanager/RMAuditLogger.java | 2 + .../resourcemanager/ResourceTrackerService.java | 2 + .../resourcemanager/recovery/RMStateStore.java | 12 +- .../recovery/RMStateUpdateAppEvent.java | 13 + .../server/resourcemanager/rmapp/RMAppImpl.java | 3 +- .../rmapp/attempt/RMAppAttemptImpl.java | 8 +- .../server/resourcemanager/rmnode/RMNode.java | 7 +- .../resourcemanager/rmnode/RMNodeImpl.java | 15 +- .../scheduler/AbstractYarnScheduler.java | 25 + .../scheduler/YarnScheduler.java | 20 + .../scheduler/capacity/CapacityScheduler.java | 25 +- .../ClientToAMTokenSecretManagerInRM.java | 7 + .../yarn/server/resourcemanager/MockNodes.java | 4 + .../resourcemanager/TestClientRMService.java | 75 ++ .../resourcemanager/TestRMAdminService.java | 34 + .../TestWorkPreservingRMRestart.java | 2 +- .../resourcetracker/TestNMReconnect.java | 39 + .../attempt/TestRMAppAttemptTransitions.java | 32 + .../capacity/TestContainerAllocation.java | 12 +- 371 files changed, 15142 insertions(+), 9357 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 0000000,d46ab47..b99e3ba mode 000000,100644..100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@@ -1,0 -1,777 +1,794 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdfs.client.impl; + + import com.google.common.annotations.VisibleForTesting; ++import com.google.common.base.Preconditions; + import org.apache.hadoop.HadoopIllegalArgumentException; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.CommonConfigurationKeysPublic; + import org.apache.hadoop.fs.Options.ChecksumOpt; + import org.apache.hadoop.fs.permission.FsPermission; + import org.apache.hadoop.hdfs.ReplicaAccessorBuilder; + import org.apache.hadoop.hdfs.protocol.HdfsConstants; + import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; + import org.apache.hadoop.hdfs.util.ByteArrayManager; + import org.apache.hadoop.ipc.Client; + import org.apache.hadoop.util.DataChecksum; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_KEY; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_KEY; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.HedgedRead; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Mmap; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Read; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Retry; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ShortCircuit; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write; + + import java.lang.Class; + import java.util.ArrayList; + import java.util.Collections; + import java.util.List; + + /** + * DFSClient configuration. + */ + public class DfsClientConf { + private static final Logger LOG = LoggerFactory.getLogger(DfsClientConf + .class); + + private final int hdfsTimeout; // timeout value for a DFS operation. + + private final int maxFailoverAttempts; + private final int maxRetryAttempts; + private final int failoverSleepBaseMillis; + private final int failoverSleepMaxMillis; + private final int maxBlockAcquireFailures; + private final int datanodeSocketWriteTimeout; + private final int ioBufferSize; + private final ChecksumOpt defaultChecksumOpt; + private final int writePacketSize; + private final int writeMaxPackets; + private final ByteArrayManager.Conf writeByteArrayManagerConf; + private final int socketTimeout; + private final long excludedNodesCacheExpiry; + /** Wait time window (in msec) if BlockMissingException is caught. */ + private final int timeWindow; + private final int numCachedConnRetry; + private final int numBlockWriteRetry; + private final int numBlockWriteLocateFollowingRetry; + private final int blockWriteLocateFollowingInitialDelayMs; + private final long defaultBlockSize; + private final long prefetchSize; + private final short defaultReplication; + private final String taskId; + private final FsPermission uMask; + private final boolean connectToDnViaHostname; + private final int retryTimesForGetLastBlockLength; + private final int retryIntervalForGetLastBlockLength; + private final long datanodeRestartTimeout; + private final long slowIoWarningThresholdMs; + + private final ShortCircuitConf shortCircuitConf; + + private final long hedgedReadThresholdMillis; + private final int hedgedReadThreadpoolSize; + private final List<Class<? extends ReplicaAccessorBuilder>> + replicaAccessorBuilderClasses; + ++ private final int stripedReadThreadpoolSize; ++ ++ + public DfsClientConf(Configuration conf) { + // The hdfsTimeout is currently the same as the ipc timeout + hdfsTimeout = Client.getTimeout(conf); + + maxRetryAttempts = conf.getInt( + Retry.MAX_ATTEMPTS_KEY, + Retry.MAX_ATTEMPTS_DEFAULT); + timeWindow = conf.getInt( + Retry.WINDOW_BASE_KEY, + Retry.WINDOW_BASE_DEFAULT); + retryTimesForGetLastBlockLength = conf.getInt( + Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY, + Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT); + retryIntervalForGetLastBlockLength = conf.getInt( + Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_KEY, + Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT); + + maxFailoverAttempts = conf.getInt( + Failover.MAX_ATTEMPTS_KEY, + Failover.MAX_ATTEMPTS_DEFAULT); + failoverSleepBaseMillis = conf.getInt( + Failover.SLEEPTIME_BASE_KEY, + Failover.SLEEPTIME_BASE_DEFAULT); + failoverSleepMaxMillis = conf.getInt( + Failover.SLEEPTIME_MAX_KEY, + Failover.SLEEPTIME_MAX_DEFAULT); + + maxBlockAcquireFailures = conf.getInt( + DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, + DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT); + datanodeSocketWriteTimeout = conf.getInt( + DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, + HdfsConstants.WRITE_TIMEOUT); + ioBufferSize = conf.getInt( + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); + defaultChecksumOpt = getChecksumOptFromConf(conf); + socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, + HdfsConstants.READ_TIMEOUT); + /** dfs.write.packet.size is an internal config variable */ + writePacketSize = conf.getInt( + DFS_CLIENT_WRITE_PACKET_SIZE_KEY, + DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); + writeMaxPackets = conf.getInt( + Write.MAX_PACKETS_IN_FLIGHT_KEY, + Write.MAX_PACKETS_IN_FLIGHT_DEFAULT); + + final boolean byteArrayManagerEnabled = conf.getBoolean( + Write.ByteArrayManager.ENABLED_KEY, + Write.ByteArrayManager.ENABLED_DEFAULT); + if (!byteArrayManagerEnabled) { + writeByteArrayManagerConf = null; + } else { + final int countThreshold = conf.getInt( + Write.ByteArrayManager.COUNT_THRESHOLD_KEY, + Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT); + final int countLimit = conf.getInt( + Write.ByteArrayManager.COUNT_LIMIT_KEY, + Write.ByteArrayManager.COUNT_LIMIT_DEFAULT); + final long countResetTimePeriodMs = conf.getLong( + Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY, + Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT); + writeByteArrayManagerConf = new ByteArrayManager.Conf( + countThreshold, countLimit, countResetTimePeriodMs); + } + + defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY, + DFS_BLOCK_SIZE_DEFAULT); + defaultReplication = (short) conf.getInt( + DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT); + taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE"); + excludedNodesCacheExpiry = conf.getLong( + Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY, + Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT); + prefetchSize = conf.getLong(Read.PREFETCH_SIZE_KEY, + 10 * defaultBlockSize); + numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY, + DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT); + numBlockWriteRetry = conf.getInt( + BlockWrite.RETRIES_KEY, + BlockWrite.RETRIES_DEFAULT); + numBlockWriteLocateFollowingRetry = conf.getInt( + BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, + BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT); + blockWriteLocateFollowingInitialDelayMs = conf.getInt( + BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY, + BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_DEFAULT); + uMask = FsPermission.getUMask(conf); + connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, + DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); + + datanodeRestartTimeout = conf.getLong( + DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, + DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000; + slowIoWarningThresholdMs = conf.getLong( + DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, + DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); + + shortCircuitConf = new ShortCircuitConf(conf); + + hedgedReadThresholdMillis = conf.getLong( + HedgedRead.THRESHOLD_MILLIS_KEY, + HedgedRead.THRESHOLD_MILLIS_DEFAULT); + hedgedReadThreadpoolSize = conf.getInt( + HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, + HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT); + ++ stripedReadThreadpoolSize = conf.getInt( ++ HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY, ++ HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_DEFAULT); ++ Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " + ++ HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY + ++ " must be greater than 0."); + replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf); + } + + @SuppressWarnings("unchecked") + private List<Class<? extends ReplicaAccessorBuilder>> + loadReplicaAccessorBuilderClasses(Configuration conf) + { + String classNames[] = conf.getTrimmedStrings( + HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY); + if (classNames.length == 0) { + return Collections.emptyList(); + } + ArrayList<Class<? extends ReplicaAccessorBuilder>> classes = + new ArrayList<>(); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + for (String className: classNames) { + try { + Class<? extends ReplicaAccessorBuilder> cls = + (Class<? extends ReplicaAccessorBuilder>) + classLoader.loadClass(className); + classes.add(cls); + } catch (Throwable t) { + LOG.warn("Unable to load " + className, t); + } + } + return classes; + } + + private DataChecksum.Type getChecksumType(Configuration conf) { + final String checksum = conf.get( + DFS_CHECKSUM_TYPE_KEY, + DFS_CHECKSUM_TYPE_DEFAULT); + try { + return DataChecksum.Type.valueOf(checksum); + } catch(IllegalArgumentException iae) { + LOG.warn("Bad checksum type: {}. Using default {}", checksum, + DFS_CHECKSUM_TYPE_DEFAULT); + return DataChecksum.Type.valueOf( + DFS_CHECKSUM_TYPE_DEFAULT); + } + } + + // Construct a checksum option from conf + private ChecksumOpt getChecksumOptFromConf(Configuration conf) { + DataChecksum.Type type = getChecksumType(conf); + int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, + DFS_BYTES_PER_CHECKSUM_DEFAULT); + return new ChecksumOpt(type, bytesPerChecksum); + } + + /** create a DataChecksum with the given option. */ + public DataChecksum createChecksum(ChecksumOpt userOpt) { + // Fill in any missing field with the default. + ChecksumOpt opt = ChecksumOpt.processChecksumOpt( + defaultChecksumOpt, userOpt); + DataChecksum dataChecksum = DataChecksum.newDataChecksum( + opt.getChecksumType(), + opt.getBytesPerChecksum()); + if (dataChecksum == null) { + throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt=" + + userOpt + ", default=" + defaultChecksumOpt + + ", effective=null"); + } + return dataChecksum; + } + + @VisibleForTesting + public int getBlockWriteLocateFollowingInitialDelayMs() { + return blockWriteLocateFollowingInitialDelayMs; + } + + /** + * @return the hdfsTimeout + */ + public int getHdfsTimeout() { + return hdfsTimeout; + } + + /** + * @return the maxFailoverAttempts + */ + public int getMaxFailoverAttempts() { + return maxFailoverAttempts; + } + + /** + * @return the maxRetryAttempts + */ + public int getMaxRetryAttempts() { + return maxRetryAttempts; + } + + /** + * @return the failoverSleepBaseMillis + */ + public int getFailoverSleepBaseMillis() { + return failoverSleepBaseMillis; + } + + /** + * @return the failoverSleepMaxMillis + */ + public int getFailoverSleepMaxMillis() { + return failoverSleepMaxMillis; + } + + /** + * @return the maxBlockAcquireFailures + */ + public int getMaxBlockAcquireFailures() { + return maxBlockAcquireFailures; + } + + /** + * @return the datanodeSocketWriteTimeout + */ + public int getDatanodeSocketWriteTimeout() { + return datanodeSocketWriteTimeout; + } + + /** + * @return the ioBufferSize + */ + public int getIoBufferSize() { + return ioBufferSize; + } + + /** + * @return the defaultChecksumOpt + */ + public ChecksumOpt getDefaultChecksumOpt() { + return defaultChecksumOpt; + } + + /** + * @return the writePacketSize + */ + public int getWritePacketSize() { + return writePacketSize; + } + + /** + * @return the writeMaxPackets + */ + public int getWriteMaxPackets() { + return writeMaxPackets; + } + + /** + * @return the writeByteArrayManagerConf + */ + public ByteArrayManager.Conf getWriteByteArrayManagerConf() { + return writeByteArrayManagerConf; + } + + /** + * @return the socketTimeout + */ + public int getSocketTimeout() { + return socketTimeout; + } + + /** + * @return the excludedNodesCacheExpiry + */ + public long getExcludedNodesCacheExpiry() { + return excludedNodesCacheExpiry; + } + + /** + * @return the timeWindow + */ + public int getTimeWindow() { + return timeWindow; + } + + /** + * @return the numCachedConnRetry + */ + public int getNumCachedConnRetry() { + return numCachedConnRetry; + } + + /** + * @return the numBlockWriteRetry + */ + public int getNumBlockWriteRetry() { + return numBlockWriteRetry; + } + + /** + * @return the numBlockWriteLocateFollowingRetry + */ + public int getNumBlockWriteLocateFollowingRetry() { + return numBlockWriteLocateFollowingRetry; + } + + /** + * @return the defaultBlockSize + */ + public long getDefaultBlockSize() { + return defaultBlockSize; + } + + /** + * @return the prefetchSize + */ + public long getPrefetchSize() { + return prefetchSize; + } + + /** + * @return the defaultReplication + */ + public short getDefaultReplication() { + return defaultReplication; + } + + /** + * @return the taskId + */ + public String getTaskId() { + return taskId; + } + + /** + * @return the uMask + */ + public FsPermission getUMask() { + return uMask; + } + + /** + * @return the connectToDnViaHostname + */ + public boolean isConnectToDnViaHostname() { + return connectToDnViaHostname; + } + + /** + * @return the retryTimesForGetLastBlockLength + */ + public int getRetryTimesForGetLastBlockLength() { + return retryTimesForGetLastBlockLength; + } + + /** + * @return the retryIntervalForGetLastBlockLength + */ + public int getRetryIntervalForGetLastBlockLength() { + return retryIntervalForGetLastBlockLength; + } + + /** + * @return the datanodeRestartTimeout + */ + public long getDatanodeRestartTimeout() { + return datanodeRestartTimeout; + } + + /** + * @return the slowIoWarningThresholdMs + */ + public long getSlowIoWarningThresholdMs() { + return slowIoWarningThresholdMs; + } + + /** + * @return the hedgedReadThresholdMillis + */ + public long getHedgedReadThresholdMillis() { + return hedgedReadThresholdMillis; + } + + /** + * @return the hedgedReadThreadpoolSize + */ + public int getHedgedReadThreadpoolSize() { + return hedgedReadThreadpoolSize; + } + + /** ++ * @return the stripedReadThreadpoolSize ++ */ ++ public int getStripedReadThreadpoolSize() { ++ return stripedReadThreadpoolSize; ++ } ++ ++ /** + * @return the replicaAccessorBuilderClasses + */ + public List<Class<? extends ReplicaAccessorBuilder>> + getReplicaAccessorBuilderClasses() { + return replicaAccessorBuilderClasses; + } + + /** + * @return the shortCircuitConf + */ + public ShortCircuitConf getShortCircuitConf() { + return shortCircuitConf; + } + + /** + * Configuration for short-circuit reads. + */ + public static class ShortCircuitConf { + private static final Logger LOG = DfsClientConf.LOG; + + private final int socketCacheCapacity; + private final long socketCacheExpiry; + + private final boolean useLegacyBlockReader; + private final boolean useLegacyBlockReaderLocal; + private final String domainSocketPath; + private final boolean skipShortCircuitChecksums; + + private final int shortCircuitBufferSize; + private final boolean shortCircuitLocalReads; + private final boolean domainSocketDataTraffic; + private final int shortCircuitStreamsCacheSize; + private final long shortCircuitStreamsCacheExpiryMs; + private final int shortCircuitSharedMemoryWatcherInterruptCheckMs; + + private final boolean shortCircuitMmapEnabled; + private final int shortCircuitMmapCacheSize; + private final long shortCircuitMmapCacheExpiryMs; + private final long shortCircuitMmapCacheRetryTimeout; + private final long shortCircuitCacheStaleThresholdMs; + + private final long keyProviderCacheExpiryMs; + + public ShortCircuitConf(Configuration conf) { + socketCacheCapacity = conf.getInt( + DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, + DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT); + socketCacheExpiry = conf.getLong( + DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, + DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT); + + useLegacyBlockReader = conf.getBoolean( + DFS_CLIENT_USE_LEGACY_BLOCKREADER, + DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT); + useLegacyBlockReaderLocal = conf.getBoolean( + DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, + DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT); + shortCircuitLocalReads = conf.getBoolean( + Read.ShortCircuit.KEY, + Read.ShortCircuit.DEFAULT); + domainSocketDataTraffic = conf.getBoolean( + DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, + DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT); + domainSocketPath = conf.getTrimmed( + DFS_DOMAIN_SOCKET_PATH_KEY, + DFS_DOMAIN_SOCKET_PATH_DEFAULT); + + LOG.debug(DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL + + " = {}", useLegacyBlockReaderLocal); + LOG.debug(Read.ShortCircuit.KEY + + " = {}", shortCircuitLocalReads); + LOG.debug(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC + + " = {}", domainSocketDataTraffic); + LOG.debug(DFS_DOMAIN_SOCKET_PATH_KEY + + " = {}", domainSocketPath); + + skipShortCircuitChecksums = conf.getBoolean( + Read.ShortCircuit.SKIP_CHECKSUM_KEY, + Read.ShortCircuit.SKIP_CHECKSUM_DEFAULT); + shortCircuitBufferSize = conf.getInt( + Read.ShortCircuit.BUFFER_SIZE_KEY, + Read.ShortCircuit.BUFFER_SIZE_DEFAULT); + shortCircuitStreamsCacheSize = conf.getInt( + Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY, + Read.ShortCircuit.STREAMS_CACHE_SIZE_DEFAULT); + shortCircuitStreamsCacheExpiryMs = conf.getLong( + Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY, + Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_DEFAULT); + shortCircuitMmapEnabled = conf.getBoolean( + Mmap.ENABLED_KEY, + Mmap.ENABLED_DEFAULT); + shortCircuitMmapCacheSize = conf.getInt( + Mmap.CACHE_SIZE_KEY, + Mmap.CACHE_SIZE_DEFAULT); + shortCircuitMmapCacheExpiryMs = conf.getLong( + Mmap.CACHE_TIMEOUT_MS_KEY, + Mmap.CACHE_TIMEOUT_MS_DEFAULT); + shortCircuitMmapCacheRetryTimeout = conf.getLong( + Mmap.RETRY_TIMEOUT_MS_KEY, + Mmap.RETRY_TIMEOUT_MS_DEFAULT); + shortCircuitCacheStaleThresholdMs = conf.getLong( + ShortCircuit.REPLICA_STALE_THRESHOLD_MS_KEY, + ShortCircuit.REPLICA_STALE_THRESHOLD_MS_DEFAULT); + shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt( + DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, + DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT); + + keyProviderCacheExpiryMs = conf.getLong( + DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS, + DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT); + } + + /** + * @return the socketCacheCapacity + */ + public int getSocketCacheCapacity() { + return socketCacheCapacity; + } + + /** + * @return the socketCacheExpiry + */ + public long getSocketCacheExpiry() { + return socketCacheExpiry; + } + + public boolean isUseLegacyBlockReaderLocal() { + return useLegacyBlockReaderLocal; + } + + public String getDomainSocketPath() { + return domainSocketPath; + } + + public boolean isShortCircuitLocalReads() { + return shortCircuitLocalReads; + } + + public boolean isDomainSocketDataTraffic() { + return domainSocketDataTraffic; + } + /** + * @return the useLegacyBlockReader + */ + public boolean isUseLegacyBlockReader() { + return useLegacyBlockReader; + } + + /** + * @return the skipShortCircuitChecksums + */ + public boolean isSkipShortCircuitChecksums() { + return skipShortCircuitChecksums; + } + + /** + * @return the shortCircuitBufferSize + */ + public int getShortCircuitBufferSize() { + return shortCircuitBufferSize; + } + + /** + * @return the shortCircuitStreamsCacheSize + */ + public int getShortCircuitStreamsCacheSize() { + return shortCircuitStreamsCacheSize; + } + + /** + * @return the shortCircuitStreamsCacheExpiryMs + */ + public long getShortCircuitStreamsCacheExpiryMs() { + return shortCircuitStreamsCacheExpiryMs; + } + + /** + * @return the shortCircuitSharedMemoryWatcherInterruptCheckMs + */ + public int getShortCircuitSharedMemoryWatcherInterruptCheckMs() { + return shortCircuitSharedMemoryWatcherInterruptCheckMs; + } + + /** + * @return the shortCircuitMmapEnabled + */ + public boolean isShortCircuitMmapEnabled() { + return shortCircuitMmapEnabled; + } + + /** + * @return the shortCircuitMmapCacheSize + */ + public int getShortCircuitMmapCacheSize() { + return shortCircuitMmapCacheSize; + } + + /** + * @return the shortCircuitMmapCacheExpiryMs + */ + public long getShortCircuitMmapCacheExpiryMs() { + return shortCircuitMmapCacheExpiryMs; + } + + /** + * @return the shortCircuitMmapCacheRetryTimeout + */ + public long getShortCircuitMmapCacheRetryTimeout() { + return shortCircuitMmapCacheRetryTimeout; + } + + /** + * @return the shortCircuitCacheStaleThresholdMs + */ + public long getShortCircuitCacheStaleThresholdMs() { + return shortCircuitCacheStaleThresholdMs; + } + + /** + * @return the keyProviderCacheExpiryMs + */ + public long getKeyProviderCacheExpiryMs() { + return keyProviderCacheExpiryMs; + } + + public String confAsString() { + + return "shortCircuitStreamsCacheSize = " + + shortCircuitStreamsCacheSize + + ", shortCircuitStreamsCacheExpiryMs = " + + shortCircuitStreamsCacheExpiryMs + + ", shortCircuitMmapCacheSize = " + + shortCircuitMmapCacheSize + + ", shortCircuitMmapCacheExpiryMs = " + + shortCircuitMmapCacheExpiryMs + + ", shortCircuitMmapCacheRetryTimeout = " + + shortCircuitMmapCacheRetryTimeout + + ", shortCircuitCacheStaleThresholdMs = " + + shortCircuitCacheStaleThresholdMs + + ", socketCacheCapacity = " + + socketCacheCapacity + + ", socketCacheExpiry = " + + socketCacheExpiry + + ", shortCircuitLocalReads = " + + shortCircuitLocalReads + + ", useLegacyBlockReaderLocal = " + + useLegacyBlockReaderLocal + + ", domainSocketDataTraffic = " + + domainSocketDataTraffic + + ", shortCircuitSharedMemoryWatcherInterruptCheckMs = " + + shortCircuitSharedMemoryWatcherInterruptCheckMs + + ", keyProviderCacheExpiryMs = " + + keyProviderCacheExpiryMs; + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java index 3d19ab9,d5f4d53..7f45132 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java @@@ -78,17 -78,13 +78,24 @@@ public final class HdfsConstants public static final String CLIENT_NAMENODE_PROTOCOL_NAME = "org.apache.hadoop.hdfs.protocol.ClientProtocol"; + /* + * These values correspond to the values used by the system default erasure + * coding policy. + * TODO: get these values from ec policy of the associated INodeFile + */ + + public static final byte NUM_DATA_BLOCKS = 6; + public static final byte NUM_PARITY_BLOCKS = 3; + // The chunk size for striped block which is used by erasure coding + public static final int BLOCK_STRIPED_CELL_SIZE = 64 * 1024; + + // Timeouts for communicating with DataNode for streaming writes/reads + public static final int READ_TIMEOUT = 60 * 1000; + public static final int READ_TIMEOUT_EXTENSION = 5 * 1000; + public static final int WRITE_TIMEOUT = 8 * 60 * 1000; + //for write pipeline + public static final int WRITE_TIMEOUT_EXTENSION = 5 * 1000; + // SafeMode actions public enum SafeModeAction { SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java index 0000000,e135d8e..f908dd3 mode 000000,100644..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java @@@ -1,0 -1,120 +1,126 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdfs; + + import java.io.IOException; + import java.nio.ByteBuffer; + import java.util.EnumSet; + + import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.fs.ReadOption; + import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; ++import org.apache.hadoop.util.DataChecksum; + + /** + * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from + * replicas. + */ + @InterfaceAudience.Private + public final class ExternalBlockReader implements BlockReader { + private final ReplicaAccessor accessor; + private final long visibleLength; + private long pos; + + ExternalBlockReader(ReplicaAccessor accessor, long visibleLength, + long startOffset) { + this.accessor = accessor; + this.visibleLength = visibleLength; + this.pos = startOffset; + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + int nread = accessor.read(pos, buf, off, len); + pos += nread; + return nread; + } + + @Override + public int read(ByteBuffer buf) throws IOException { + int nread = accessor.read(pos, buf); + pos += nread; + return nread; + } + + @Override + public long skip(long n) throws IOException { + // You cannot skip backwards + if (n <= 0) { + return 0; + } + // You can't skip past the end of the replica. + long oldPos = pos; + pos += n; + if (pos > visibleLength) { + pos = visibleLength; + } + return pos - oldPos; + } + + @Override + public int available() throws IOException { + // We return the amount of bytes that we haven't read yet from the + // replica, based on our current position. Some of the other block + // readers return a shorter length than that. The only advantage to + // returning a shorter length is that the DFSInputStream will + // trash your block reader and create a new one if someone tries to + // seek() beyond the available() region. + long diff = visibleLength - pos; + if (diff > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int)diff; + } + } + + @Override + public void close() throws IOException { + accessor.close(); + } + + @Override + public void readFully(byte[] buf, int offset, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, offset, len); + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public boolean isLocal() { + return accessor.isLocal(); + } + + @Override + public boolean isShortCircuit() { + return accessor.isShortCircuit(); + } + + @Override + public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { + // For now, pluggable ReplicaAccessors do not support zero-copy. + return null; + } ++ ++ @Override ++ public DataChecksum getDataChecksum() { ++ return null; ++ } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index c083b5e,887accf..f292ee8 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@@ -445,17 -415,12 +431,17 @@@ public class PBHelper } public static BlockWithLocationsProto convert(BlockWithLocations blk) { - return BlockWithLocationsProto.newBuilder() - .setBlock(convert(blk.getBlock())) + BlockWithLocationsProto.Builder builder = BlockWithLocationsProto + .newBuilder().setBlock(convert(blk.getBlock())) .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids())) .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())) - .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes())); - .addAllStorageTypes(PBHelperClient.convertStorageTypes(blk.getStorageTypes())) - .build(); ++ .addAllStorageTypes(PBHelperClient.convertStorageTypes(blk.getStorageTypes())); + if (blk instanceof StripedBlockWithLocations) { + StripedBlockWithLocations sblk = (StripedBlockWithLocations) blk; + builder.setIndices(getByteString(sblk.getIndices())); + builder.setDataBlockNum(sblk.getDataBlockNum()); + } + return builder.build(); } public static BlockWithLocations convert(BlockWithLocationsProto b) { @@@ -806,26 -697,17 +723,26 @@@ StorageType[] storageTypes = b.getStorageTypes(); if (storageTypes != null) { - for (int i = 0; i < storageTypes.length; ++i) { - builder.addStorageTypes(PBHelperClient.convertStorageType(storageTypes[i])); + for (StorageType storageType : storageTypes) { - builder.addStorageTypes(PBHelper.convertStorageType(storageType)); ++ builder.addStorageTypes(PBHelperClient.convertStorageType(storageType)); } } final String[] storageIDs = b.getStorageIDs(); if (storageIDs != null) { builder.addAllStorageIDs(Arrays.asList(storageIDs)); } + if (b instanceof LocatedStripedBlock) { + LocatedStripedBlock sb = (LocatedStripedBlock) b; + int[] indices = sb.getBlockIndices(); + Token<BlockTokenIdentifier>[] blockTokens = sb.getBlockTokens(); + for (int i = 0; i < indices.length; i++) { + builder.addBlockIndex(indices[i]); - builder.addBlockTokens(PBHelper.convert(blockTokens[i])); ++ builder.addBlockTokens(PBHelperClient.convert(blockTokens[i])); + } + } - return builder.setB(PBHelper.convert(b.getBlock())) - .setBlockToken(PBHelper.convert(b.getBlockToken())) + return builder.setB(PBHelperClient.convert(b.getBlock())) + .setBlockToken(PBHelperClient.convert(b.getBlockToken())) .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build(); } @@@ -3144,191 -2880,4 +2954,192 @@@ setLeaseId(context.getLeaseId()). build(); } + + public static ECSchema convertECSchema(ECSchemaProto schema) { + List<ECSchemaOptionEntryProto> optionsList = schema.getOptionsList(); + Map<String, String> options = new HashMap<>(optionsList.size()); + for (ECSchemaOptionEntryProto option : optionsList) { + options.put(option.getKey(), option.getValue()); + } + return new ECSchema(schema.getCodecName(), schema.getDataUnits(), + schema.getParityUnits(), options); + } + + public static ECSchemaProto convertECSchema(ECSchema schema) { + ECSchemaProto.Builder builder = ECSchemaProto.newBuilder() + .setCodecName(schema.getCodecName()) + .setDataUnits(schema.getNumDataUnits()) + .setParityUnits(schema.getNumParityUnits()); + Set<Entry<String, String>> entrySet = schema.getExtraOptions().entrySet(); + for (Entry<String, String> entry : entrySet) { + builder.addOptions(ECSchemaOptionEntryProto.newBuilder() + .setKey(entry.getKey()).setValue(entry.getValue()).build()); + } + return builder.build(); + } + + public static ErasureCodingPolicy convertErasureCodingPolicy( + ErasureCodingPolicyProto policy) { + return new ErasureCodingPolicy(policy.getName(), + convertECSchema(policy.getSchema()), + policy.getCellSize()); + } + + public static ErasureCodingPolicyProto convertErasureCodingPolicy( + ErasureCodingPolicy policy) { + ErasureCodingPolicyProto.Builder builder = ErasureCodingPolicyProto + .newBuilder() + .setName(policy.getName()) + .setSchema(convertECSchema(policy.getSchema())) + .setCellSize(policy.getCellSize()); + return builder.build(); + } + + public static ErasureCodingZoneProto convertErasureCodingZone( + ErasureCodingZone ecZone) { + return ErasureCodingZoneProto.newBuilder().setDir(ecZone.getDir()) + .setEcPolicy(convertErasureCodingPolicy(ecZone.getErasureCodingPolicy())) + .build(); + } + + public static ErasureCodingZone convertErasureCodingZone( + ErasureCodingZoneProto ecZoneProto) { + return new ErasureCodingZone(ecZoneProto.getDir(), + convertErasureCodingPolicy(ecZoneProto.getEcPolicy())); + } + + public static BlockECRecoveryInfo convertBlockECRecoveryInfo( + BlockECRecoveryInfoProto blockEcRecoveryInfoProto) { + ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock(); + ExtendedBlock block = convert(blockProto); + + DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto + .getSourceDnInfos(); + DatanodeInfo[] sourceDnInfos = convert(sourceDnInfosProto); + + DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto + .getTargetDnInfos(); + DatanodeInfo[] targetDnInfos = convert(targetDnInfosProto); + + StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto + .getTargetStorageUuids(); + String[] targetStorageUuids = convert(targetStorageUuidsProto); + + StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto + .getTargetStorageTypes(); + StorageType[] convertStorageTypes = convertStorageTypes( + targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto + .getStorageTypesList().size()); + + List<Integer> liveBlockIndicesList = blockEcRecoveryInfoProto + .getLiveBlockIndicesList(); + short[] liveBlkIndices = new short[liveBlockIndicesList.size()]; + for (int i = 0; i < liveBlockIndicesList.size(); i++) { + liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue(); + } + + ErasureCodingPolicy ecPolicy = + convertErasureCodingPolicy(blockEcRecoveryInfoProto.getEcPolicy()); + + return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos, + targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy); + } + + public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo( + BlockECRecoveryInfo blockEcRecoveryInfo) { + BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto + .newBuilder(); - builder.setBlock(convert(blockEcRecoveryInfo.getExtendedBlock())); ++ builder.setBlock(PBHelperClient.convert( ++ blockEcRecoveryInfo.getExtendedBlock())); + + DatanodeInfo[] sourceDnInfos = blockEcRecoveryInfo.getSourceDnInfos(); + builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos)); + + DatanodeInfo[] targetDnInfos = blockEcRecoveryInfo.getTargetDnInfos(); + builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos)); + + String[] targetStorageIDs = blockEcRecoveryInfo.getTargetStorageIDs(); + builder.setTargetStorageUuids(convertStorageIDs(targetStorageIDs)); + + StorageType[] targetStorageTypes = blockEcRecoveryInfo + .getTargetStorageTypes(); + builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes)); + + short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices(); + builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices)); + + builder.setEcPolicy(convertErasureCodingPolicy(blockEcRecoveryInfo + .getErasureCodingPolicy())); + + return builder.build(); + } + + private static List<Integer> convertIntArray(short[] liveBlockIndices) { + List<Integer> liveBlockIndicesList = new ArrayList<Integer>(); + for (short s : liveBlockIndices) { + liveBlockIndicesList.add((int) s); + } + return liveBlockIndicesList; + } + + private static StorageTypesProto convertStorageTypesProto( + StorageType[] targetStorageTypes) { + StorageTypesProto.Builder builder = StorageTypesProto.newBuilder(); + for (StorageType storageType : targetStorageTypes) { - builder.addStorageTypes(convertStorageType(storageType)); ++ builder.addStorageTypes(PBHelperClient.convertStorageType(storageType)); + } + return builder.build(); + } + + private static StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) { + StorageUuidsProto.Builder builder = StorageUuidsProto.newBuilder(); + for (String storageUuid : targetStorageIDs) { + builder.addStorageUuids(storageUuid); + } + return builder.build(); + } + + private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) { + DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder(); + for (DatanodeInfo datanodeInfo : dnInfos) { - builder.addDatanodes(convert(datanodeInfo)); ++ builder.addDatanodes(PBHelperClient.convert(datanodeInfo)); + } + return builder.build(); + } + + private static String[] convert(StorageUuidsProto targetStorageUuidsProto) { + List<String> storageUuidsList = targetStorageUuidsProto + .getStorageUuidsList(); + String[] storageUuids = new String[storageUuidsList.size()]; + for (int i = 0; i < storageUuidsList.size(); i++) { + storageUuids[i] = storageUuidsList.get(i); + } + return storageUuids; + } + + public static BlockECRecoveryCommandProto convert( + BlockECRecoveryCommand blkECRecoveryCmd) { + BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto + .newBuilder(); + Collection<BlockECRecoveryInfo> blockECRecoveryInfos = blkECRecoveryCmd + .getECTasks(); + for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) { + builder + .addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo)); + } + return builder.build(); + } + + public static BlockECRecoveryCommand convert( + BlockECRecoveryCommandProto blkECRecoveryCmdProto) { + Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<BlockECRecoveryInfo>(); + List<BlockECRecoveryInfoProto> blockECRecoveryinfoList = blkECRecoveryCmdProto + .getBlockECRecoveryinfoList(); + for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) { + blkECRecoveryInfos + .add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto)); + } + return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, + blkECRecoveryInfos); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 555f506,be1a9ef..07c3c01 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@@ -17,8 -17,7 +17,8 @@@ */ package org.apache.hadoop.hdfs.server.balancer; - import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; + import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@@ -64,10 -62,8 +63,9 @@@ import org.apache.hadoop.hdfs.protocol. import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; - import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; @@@ -857,10 -794,14 +854,14 @@@ public class Dispatcher if (shouldFetchMoreBlocks()) { // fetch new blocks try { - blocksToReceive -= getBlockList(); + final long received = getBlockList(); + if (received == 0) { + return; + } + blocksToReceive -= received; continue; } catch (IOException e) { - LOG.warn("Exception while getting block list", e); + LOG.warn("Exception while getting reportedBlock list", e); return; } } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java index 928424b,95d9983..2f214be --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java @@@ -55,12 -55,6 +55,12 @@@ public interface BlockCollection public long getPreferredBlockSize(); /** + * Get block replication for the collection. + * @return block replication value. Return 0 if the file is erasure coded. + */ + public short getPreferredBlockReplication(); + - /** ++ /** * @return the storage policy ID. */ public byte getStoragePolicyID(); @@@ -88,7 -81,7 +88,12 @@@ public boolean isUnderConstruction(); /** + * @return whether the block collection is in striping format + */ - public boolean isStriped(); ++ boolean isStriped(); ++ ++ /** + * @return the id for the block collection + */ + long getId(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index f440e14,706cbcd..dc296ac --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@@ -19,25 -19,39 +19,38 @@@ package org.apache.hadoop.hdfs.server.b import java.io.IOException; import java.util.LinkedList; + import java.util.List; import com.google.common.base.Preconditions; -import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; + import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.util.LightWeightGSet; + import static org.apache.hadoop.hdfs.server.namenode.INodeId.INVALID_INODE_ID; + /** - * BlockInfo class maintains for a given block - * the {@link BlockCollection} it is part of and datanodes where the replicas of - * the block are stored. + * For a given block (or an erasure coding block group), BlockInfo class + * maintains 1) the {@link BlockCollection} it is part of, and 2) datanodes + * where the replicas of the block, or blocks belonging to the erasure coding + * block group, are stored. */ -@InterfaceAudience.Private -public abstract class BlockInfo extends Block +public abstract class BlockInfo extends Block implements LightWeightGSet.LinkedElement { + public static final BlockInfo[] EMPTY_ARRAY = {}; - private BlockCollection bc; + /** + * Replication factor. + */ + private short replication; + + /** + * Block collection ID. + */ + private long bcId; - /** For implementing {@link LightWeightGSet.LinkedElement} interface */ + /** For implementing {@link LightWeightGSet.LinkedElement} interface. */ private LightWeightGSet.LinkedElement nextLinkedElement; /** @@@ -58,26 -72,48 +71,40 @@@ /** * Construct an entry for blocksmap - * @param replication the block's replication factor + * @param size the block's replication factor, or the total number of blocks + * in the block group */ - public BlockInfo(short replication) { - this.triplets = new Object[3*replication]; + public BlockInfo(short size) { + this.triplets = new Object[3 * size]; - this.bc = null; + this.bcId = INVALID_INODE_ID; - this.replication = replication; ++ this.replication = isStriped() ? 0 : size; } - public BlockInfo(Block blk, short replication) { + public BlockInfo(Block blk, short size) { super(blk); - this.triplets = new Object[3 * size]; - this.bc = null; - this.triplets = new Object[3*replication]; ++ this.triplets = new Object[3*size]; + this.bcId = INVALID_INODE_ID; - this.replication = replication; - } - - /** - * Copy construction. - * @param from BlockInfo to copy from. - */ - protected BlockInfo(BlockInfo from) { - this(from, from.getReplication()); - this.bcId = from.bcId; ++ this.replication = isStriped() ? 0 : size; + } + + public short getReplication() { + return replication; } - public BlockCollection getBlockCollection() { - return bc; + public void setReplication(short repl) { + this.replication = repl; } - public void setBlockCollection(BlockCollection bc) { - this.bc = bc; + public long getBlockCollectionId() { + return bcId; + } + + public void setBlockCollectionId(long id) { + this.bcId = id; + } + + public boolean isDeleted() { + return bcId == INVALID_INODE_ID; } public DatanodeDescriptor getDatanode(int index) { @@@ -342,7 -363,7 +365,8 @@@ public void convertToBlockUnderConstruction(BlockUCState s, DatanodeStorageInfo[] targets) { if (isComplete()) { - uc = new BlockUnderConstructionFeature(this, s, targets, this.isStriped()); - uc = new BlockUnderConstructionFeature(this, s, targets); ++ uc = new BlockUnderConstructionFeature(this, s, targets, ++ this.isStriped()); } else { // the block is already under construction uc.setBlockUCState(s); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java index 12b4fd3,42934c3..b9d8486 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java @@@ -35,17 -36,14 +35,6 @@@ public class BlockInfoContiguous extend } /** - * Copy construction. This is used to convert - * BlockReplicationInfoUnderConstruction - * - * @param from BlockReplicationInfo to copy from. - * Copy construction. - * @param from BlockInfoContiguous to copy from. -- */ -- protected BlockInfoContiguous(BlockInfoContiguous from) { - this(from, (short) (from.triplets.length / 3)); - this.setBlockCollection(from.getBlockCollection()); - super(from); -- } -- -- /** * Ensure that there is enough space to include num more triplets. * @return first free triplet index. */