HDFS-8053. Move DFSIn/OutputStream and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/94cbb6d1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/94cbb6d1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/94cbb6d1 Branch: refs/heads/branch-2 Commit: 94cbb6d16483ba011b7104565b4084bf2a3eb6e6 Parents: b46e4ce Author: Haohui Mai <[email protected]> Authored: Sat Sep 26 11:08:25 2015 -0700 Committer: Haohui Mai <[email protected]> Committed: Sat Sep 26 11:16:50 2015 -0700 ---------------------------------------------------------------------- .../dev-support/findbugsExcludeFile.xml | 24 + .../apache/hadoop/fs/BlockStorageLocation.java | 52 + .../org/apache/hadoop/fs/HdfsBlockLocation.java | 47 + .../java/org/apache/hadoop/fs/HdfsVolumeId.java | 73 + .../java/org/apache/hadoop/fs/VolumeId.java | 40 + .../hadoop/hdfs/BlockMissingException.java | 65 + .../apache/hadoop/hdfs/BlockReaderFactory.java | 893 +++++ .../hadoop/hdfs/BlockStorageLocationUtil.java | 369 ++ .../java/org/apache/hadoop/hdfs/DFSClient.java | 3203 ++++++++++++++++++ .../hadoop/hdfs/DFSClientFaultInjector.java | 60 + .../hadoop/hdfs/DFSHedgedReadMetrics.java | 58 + .../hadoop/hdfs/DFSInotifyEventInputStream.java | 239 ++ .../org/apache/hadoop/hdfs/DFSInputStream.java | 1915 +++++++++++ .../org/apache/hadoop/hdfs/DFSOutputStream.java | 923 +++++ .../java/org/apache/hadoop/hdfs/DFSPacket.java | 345 ++ .../org/apache/hadoop/hdfs/DFSUtilClient.java | 24 + .../org/apache/hadoop/hdfs/DataStreamer.java | 1904 +++++++++++ .../hadoop/hdfs/HdfsConfigurationLoader.java | 44 + .../apache/hadoop/hdfs/RemotePeerFactory.java | 43 + .../hdfs/UnknownCipherSuiteException.java | 35 + .../UnknownCryptoProtocolVersionException.java | 38 + .../org/apache/hadoop/hdfs/XAttrHelper.java | 174 + .../hadoop/hdfs/client/HdfsDataInputStream.java | 113 + .../hdfs/client/HdfsDataOutputStream.java | 112 + .../hadoop/hdfs/client/impl/LeaseRenewer.java | 524 +++ .../hdfs/inotify/MissingEventsException.java | 54 + .../hadoop/hdfs/protocol/AclException.java | 39 + .../hdfs/protocol/CacheDirectiveIterator.java | 130 + .../hadoop/hdfs/protocol/CachePoolIterator.java | 63 + .../hdfs/protocol/EncryptionZoneIterator.java | 64 + .../QuotaByStorageTypeExceededException.java | 56 + .../hdfs/protocol/UnresolvedPathException.java | 87 + .../datatransfer/ReplaceDatanodeOnFailure.java | 200 ++ .../datanode/ReplicaNotFoundException.java | 53 + .../namenode/RetryStartFileException.java | 36 + hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../dev-support/findbugsExcludeFile.xml | 19 - .../apache/hadoop/fs/BlockStorageLocation.java | 52 - .../org/apache/hadoop/fs/HdfsBlockLocation.java | 47 - .../java/org/apache/hadoop/fs/HdfsVolumeId.java | 73 - .../java/org/apache/hadoop/fs/VolumeId.java | 40 - .../hadoop/hdfs/BlockMissingException.java | 65 - .../apache/hadoop/hdfs/BlockReaderFactory.java | 892 ----- .../hadoop/hdfs/BlockStorageLocationUtil.java | 369 -- .../java/org/apache/hadoop/hdfs/DFSClient.java | 3200 ----------------- .../hadoop/hdfs/DFSClientFaultInjector.java | 57 - .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 - .../hadoop/hdfs/DFSHedgedReadMetrics.java | 58 - .../hadoop/hdfs/DFSInotifyEventInputStream.java | 239 -- .../org/apache/hadoop/hdfs/DFSInputStream.java | 1915 ----------- .../org/apache/hadoop/hdfs/DFSOutputStream.java | 923 ----- .../java/org/apache/hadoop/hdfs/DFSPacket.java | 345 -- .../java/org/apache/hadoop/hdfs/DFSUtil.java | 23 - .../org/apache/hadoop/hdfs/DataStreamer.java | 1903 ----------- .../apache/hadoop/hdfs/HdfsConfiguration.java | 11 +- .../apache/hadoop/hdfs/RemotePeerFactory.java | 43 - .../hdfs/UnknownCipherSuiteException.java | 35 - .../UnknownCryptoProtocolVersionException.java | 38 - .../org/apache/hadoop/hdfs/XAttrHelper.java | 174 - .../hadoop/hdfs/client/HdfsDataInputStream.java | 113 - .../hdfs/client/HdfsDataOutputStream.java | 112 - .../hadoop/hdfs/client/impl/LeaseRenewer.java | 524 --- .../hdfs/inotify/MissingEventsException.java | 54 - .../hadoop/hdfs/protocol/AclException.java | 39 - .../hdfs/protocol/CacheDirectiveIterator.java | 130 - .../hadoop/hdfs/protocol/CachePoolIterator.java | 63 - .../hdfs/protocol/EncryptionZoneIterator.java | 64 - .../QuotaByStorageTypeExceededException.java | 56 - .../hdfs/protocol/UnresolvedPathException.java | 87 - .../datatransfer/ReplaceDatanodeOnFailure.java | 200 -- .../hadoop/hdfs/server/balancer/Dispatcher.java | 3 +- .../hdfs/server/datanode/BlockReceiver.java | 4 +- .../hdfs/server/datanode/BlockSender.java | 4 +- .../hadoop/hdfs/server/datanode/DataNode.java | 3 +- .../hdfs/server/datanode/DataXceiver.java | 6 +- .../datanode/ReplicaNotFoundException.java | 53 - .../datanode/fsdataset/impl/BlockPoolSlice.java | 4 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 8 +- .../impl/RamDiskAsyncLazyPersistService.java | 4 +- .../namenode/RetryStartFileException.java | 36 - .../hdfs/server/namenode/TransferFsImage.java | 4 +- .../datanode/TestFiDataTransferProtocol2.java | 1 - .../org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +- .../hdfs/MiniDFSClusterWithNodeGroup.java | 2 +- .../hadoop/hdfs/TestBlockStoragePolicy.java | 1 - .../TestClientProtocolForPipelineRecovery.java | 6 +- .../apache/hadoop/hdfs/TestCrcCorruption.java | 2 +- .../org/apache/hadoop/hdfs/TestDFSUtil.java | 8 +- .../org/apache/hadoop/hdfs/TestFileStatus.java | 2 +- .../java/org/apache/hadoop/hdfs/TestPread.java | 10 +- 90 files changed, 12143 insertions(+), 12087 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml index 515da24..41a8564 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml @@ -32,4 +32,28 @@ <Method name="allocSlot" /> <Bug pattern="UL_UNRELEASED_LOCK" /> </Match> + <Match> + <Class name="org.apache.hadoop.hdfs.DFSInputStream"/> + <Field name="tcpReadsDisabledForTesting"/> + <Bug pattern="MS_SHOULD_BE_FINAL"/> + </Match> + + <!-- + ResponseProccessor is thread that is designed to catch RuntimeException. + --> + <Match> + <Class name="org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor" /> + <Method name="run" /> + <Bug pattern="REC_CATCH_EXCEPTION" /> + </Match> + + <!-- + We use a separate lock to guard cachingStrategy in order to separate + locks for p-reads from seek + read invocations. + --> + <Match> + <Class name="org.apache.hadoop.hdfs.DFSInputStream" /> + <Field name="cachingStrategy" /> + <Bug pattern="IS2_INCONSISTENT_SYNC" /> + </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java new file mode 100644 index 0000000..2200994 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Wrapper for {@link BlockLocation} that also adds {@link VolumeId} volume + * location information for each replica. + */ [email protected] [email protected] +@Deprecated +public class BlockStorageLocation extends BlockLocation { + + private final VolumeId[] volumeIds; + + public BlockStorageLocation(BlockLocation loc, VolumeId[] volumeIds) + throws IOException { + // Initialize with data from passed in BlockLocation + super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(), loc + .getOffset(), loc.getLength(), loc.isCorrupt()); + this.volumeIds = volumeIds; + } + + /** + * Gets the list of {@link VolumeId} corresponding to the block's replicas. + * + * @return volumeIds list of VolumeId for the block's replicas + */ + public VolumeId[] getVolumeIds() { + return volumeIds; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java new file mode 100644 index 0000000..0ccacda --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; + +/** + * Wrapper for {@link BlockLocation} that also includes a {@link LocatedBlock}, + * allowing more detailed queries to the datanode about a block. + * + */ [email protected] [email protected] +public class HdfsBlockLocation extends BlockLocation { + + private final LocatedBlock block; + + public HdfsBlockLocation(BlockLocation loc, LocatedBlock block) + throws IOException { + // Initialize with data from passed in BlockLocation + super(loc); + this.block = block; + } + + public LocatedBlock getLocatedBlock() { + return block; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java new file mode 100644 index 0000000..6e9d3d7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.base.Preconditions; + +/** + * HDFS-specific volume identifier which implements {@link VolumeId}. Can be + * used to differentiate between the data directories on a single datanode. This + * identifier is only unique on a per-datanode basis. + */ [email protected] [email protected] +public class HdfsVolumeId implements VolumeId { + + private final byte[] id; + + public HdfsVolumeId(byte[] id) { + Preconditions.checkNotNull(id, "id cannot be null"); + this.id = id; + } + + @Override + public int compareTo(VolumeId arg0) { + if (arg0 == null) { + return 1; + } + return hashCode() - arg0.hashCode(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(id).toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + if (obj == this) { + return true; + } + HdfsVolumeId that = (HdfsVolumeId) obj; + return new EqualsBuilder().append(this.id, that.id).isEquals(); + } + + @Override + public String toString() { + return StringUtils.byteToHexString(id); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/VolumeId.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/VolumeId.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/VolumeId.java new file mode 100644 index 0000000..e56e304 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/VolumeId.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Opaque interface that identifies a disk location. Subclasses + * should implement {@link Comparable} and override both equals and hashCode. + */ [email protected] [email protected] +public interface VolumeId extends Comparable<VolumeId> { + + @Override + abstract public int compareTo(VolumeId arg0); + + @Override + abstract public int hashCode(); + + @Override + abstract public boolean equals(Object obj); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java new file mode 100644 index 0000000..7bba8a4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This exception is thrown when a read encounters a block that has no locations + * associated with it. + */ [email protected] [email protected] +public class BlockMissingException extends IOException { + + private static final long serialVersionUID = 1L; + + private final String filename; + private final long offset; + + /** + * An exception that indicates that file was corrupted. + * @param filename name of corrupted file + * @param description a description of the corruption details + */ + public BlockMissingException(String filename, String description, long offset) { + super(description); + this.filename = filename; + this.offset = offset; + } + + /** + * Returns the name of the corrupted file. + * @return name of corrupted file + */ + public String getFile() { + return filename; + } + + /** + * Returns the offset at which this file is corrupted + * @return offset of corrupted file + */ + public long getOffset() { + return offset; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java new file mode 100644 index 0000000..69e9da2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -0,0 +1,893 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION; + +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.net.InetSocketAddress; +import java.util.List; + +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; +import org.apache.hadoop.hdfs.net.DomainPeer; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; +import org.apache.hadoop.hdfs.util.IOUtilsClient; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.PerformanceAdvisory; +import org.apache.hadoop.util.Time; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Utility class to create BlockReader implementations. + */ [email protected] +public class BlockReaderFactory implements ShortCircuitReplicaCreator { + static final Logger LOG = LoggerFactory.getLogger(BlockReaderFactory.class); + + public static class FailureInjector { + public void injectRequestFileDescriptorsFailure() throws IOException { + // do nothing + } + public boolean getSupportsReceiptVerification() { + return true; + } + } + + @VisibleForTesting + static ShortCircuitReplicaCreator + createShortCircuitReplicaInfoCallback = null; + + private final DfsClientConf conf; + + /** + * Injects failures into specific operations during unit tests. + */ + private static FailureInjector failureInjector = new FailureInjector(); + + /** + * The file name, for logging and debugging purposes. + */ + private String fileName; + + /** + * The block ID and block pool ID to use. + */ + private ExtendedBlock block; + + /** + * The block token to use for security purposes. + */ + private Token<BlockTokenIdentifier> token; + + /** + * The offset within the block to start reading at. + */ + private long startOffset; + + /** + * If false, we won't try to verify the block checksum. + */ + private boolean verifyChecksum; + + /** + * The name of this client. + */ + private String clientName; + + /** + * The DataNode we're talking to. + */ + private DatanodeInfo datanode; + + /** + * StorageType of replica on DataNode. + */ + private StorageType storageType; + + /** + * If false, we won't try short-circuit local reads. + */ + private boolean allowShortCircuitLocalReads; + + /** + * The ClientContext to use for things like the PeerCache. + */ + private ClientContext clientContext; + + /** + * Number of bytes to read. -1 indicates no limit. + */ + private long length = -1; + + /** + * Caching strategy to use when reading the block. + */ + private CachingStrategy cachingStrategy; + + /** + * Socket address to use to connect to peer. + */ + private InetSocketAddress inetSocketAddress; + + /** + * Remote peer factory to use to create a peer, if needed. + */ + private RemotePeerFactory remotePeerFactory; + + /** + * UserGroupInformation to use for legacy block reader local objects, if needed. + */ + private UserGroupInformation userGroupInformation; + + /** + * Configuration to use for legacy block reader local objects, if needed. + */ + private Configuration configuration; + + /** + * Information about the domain socket path we should use to connect to the + * local peer-- or null if we haven't examined the local domain socket. + */ + private DomainSocketFactory.PathInfo pathInfo; + + /** + * The remaining number of times that we'll try to pull a socket out of the + * cache. + */ + private int remainingCacheTries; + + public BlockReaderFactory(DfsClientConf conf) { + this.conf = conf; + this.remainingCacheTries = conf.getNumCachedConnRetry(); + } + + public BlockReaderFactory setFileName(String fileName) { + this.fileName = fileName; + return this; + } + + public BlockReaderFactory setBlock(ExtendedBlock block) { + this.block = block; + return this; + } + + public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) { + this.token = token; + return this; + } + + public BlockReaderFactory setStartOffset(long startOffset) { + this.startOffset = startOffset; + return this; + } + + public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) { + this.verifyChecksum = verifyChecksum; + return this; + } + + public BlockReaderFactory setClientName(String clientName) { + this.clientName = clientName; + return this; + } + + public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) { + this.datanode = datanode; + return this; + } + + public BlockReaderFactory setStorageType(StorageType storageType) { + this.storageType = storageType; + return this; + } + + public BlockReaderFactory setAllowShortCircuitLocalReads( + boolean allowShortCircuitLocalReads) { + this.allowShortCircuitLocalReads = allowShortCircuitLocalReads; + return this; + } + + public BlockReaderFactory setClientCacheContext( + ClientContext clientContext) { + this.clientContext = clientContext; + return this; + } + + public BlockReaderFactory setLength(long length) { + this.length = length; + return this; + } + + public BlockReaderFactory setCachingStrategy( + CachingStrategy cachingStrategy) { + this.cachingStrategy = cachingStrategy; + return this; + } + + public BlockReaderFactory setInetSocketAddress ( + InetSocketAddress inetSocketAddress) { + this.inetSocketAddress = inetSocketAddress; + return this; + } + + public BlockReaderFactory setUserGroupInformation( + UserGroupInformation userGroupInformation) { + this.userGroupInformation = userGroupInformation; + return this; + } + + public BlockReaderFactory setRemotePeerFactory( + RemotePeerFactory remotePeerFactory) { + this.remotePeerFactory = remotePeerFactory; + return this; + } + + public BlockReaderFactory setConfiguration( + Configuration configuration) { + this.configuration = configuration; + return this; + } + + @VisibleForTesting + public static void setFailureInjectorForTesting(FailureInjector injector) { + failureInjector = injector; + } + + /** + * Build a BlockReader with the given options. + * + * This function will do the best it can to create a block reader that meets + * all of our requirements. We prefer short-circuit block readers + * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the + * former avoid the overhead of socket communication. If short-circuit is + * unavailable, our next fallback is data transfer over UNIX domain sockets, + * if dfs.client.domain.socket.data.traffic has been enabled. If that doesn't + * work, we will try to create a remote block reader that operates over TCP + * sockets. + * + * There are a few caches that are important here. + * + * The ShortCircuitCache stores file descriptor objects which have been passed + * from the DataNode. + * + * The DomainSocketFactory stores information about UNIX domain socket paths + * that we not been able to use in the past, so that we don't waste time + * retrying them over and over. (Like all the caches, it does have a timeout, + * though.) + * + * The PeerCache stores peers that we have used in the past. If we can reuse + * one of these peers, we avoid the overhead of re-opening a socket. However, + * if the socket has been timed out on the remote end, our attempt to reuse + * the socket may end with an IOException. For that reason, we limit our + * attempts at socket reuse to dfs.client.cached.conn.retry times. After + * that, we create new sockets. This avoids the problem where a thread tries + * to talk to a peer that it hasn't talked to in a while, and has to clean out + * every entry in a socket cache full of stale entries. + * + * @return The new BlockReader. We will not return null. + * + * @throws InvalidToken + * If the block token was invalid. + * InvalidEncryptionKeyException + * If the encryption key was invalid. + * Other IOException + * If there was another problem. + */ + public BlockReader build() throws IOException { + Preconditions.checkNotNull(configuration); + BlockReader reader = tryToCreateExternalBlockReader(); + if (reader != null) { + return reader; + } + final ShortCircuitConf scConf = conf.getShortCircuitConf(); + if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) { + if (clientContext.getUseLegacyBlockReaderLocal()) { + reader = getLegacyBlockReaderLocal(); + if (reader != null) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": returning new legacy block reader local."); + } + return reader; + } + } else { + reader = getBlockReaderLocal(); + if (reader != null) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": returning new block reader local."); + } + return reader; + } + } + } + if (scConf.isDomainSocketDataTraffic()) { + reader = getRemoteBlockReaderFromDomain(); + if (reader != null) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": returning new remote block reader using " + + "UNIX domain socket on " + pathInfo.getPath()); + } + return reader; + } + } + Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting, + "TCP reads were disabled for testing, but we failed to " + + "do a non-TCP read."); + return getRemoteBlockReaderFromTcp(); + } + + private BlockReader tryToCreateExternalBlockReader() { + List<Class<? extends ReplicaAccessorBuilder>> clses = + conf.getReplicaAccessorBuilderClasses(); + for (Class<? extends ReplicaAccessorBuilder> cls : clses) { + try { + ByteArrayDataOutput bado = ByteStreams.newDataOutput(); + token.write(bado); + byte tokenBytes[] = bado.toByteArray(); + + Constructor<? extends ReplicaAccessorBuilder> ctor = + cls.getConstructor(); + ReplicaAccessorBuilder builder = ctor.newInstance(); + ReplicaAccessor accessor = builder. + setAllowShortCircuitReads(allowShortCircuitLocalReads). + setBlock(block.getBlockId(), block.getBlockPoolId()). + setGenerationStamp(block.getGenerationStamp()). + setBlockAccessToken(tokenBytes). + setClientName(clientName). + setConfiguration(configuration). + setFileName(fileName). + setVerifyChecksum(verifyChecksum). + setVisibleLength(length). + build(); + if (accessor == null) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": No ReplicaAccessor created by " + + cls.getName()); + } + } else { + return new ExternalBlockReader(accessor, length, startOffset); + } + } catch (Throwable t) { + LOG.warn("Failed to construct new object of type " + + cls.getName(), t); + } + } + return null; + } + + + /** + * Get {@link BlockReaderLocalLegacy} for short circuited local reads. + * This block reader implements the path-based style of local reads + * first introduced in HDFS-2246. + */ + private BlockReader getLegacyBlockReaderLocal() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": trying to construct BlockReaderLocalLegacy"); + } + if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " + + "the address " + inetSocketAddress + " is not local"); + } + return null; + } + if (clientContext.getDisableLegacyBlockReaderLocal()) { + PerformanceAdvisory.LOG.debug("{}: can't construct " + + "BlockReaderLocalLegacy because " + + "disableLegacyBlockReaderLocal is set.", this); + return null; + } + IOException ioe; + try { + return BlockReaderLocalLegacy.newBlockReader(conf, + userGroupInformation, configuration, fileName, block, token, + datanode, startOffset, length, storageType); + } catch (RemoteException remoteException) { + ioe = remoteException.unwrapRemoteException( + InvalidToken.class, AccessControlException.class); + } catch (IOException e) { + ioe = e; + } + if ((!(ioe instanceof AccessControlException)) && + isSecurityException(ioe)) { + // Handle security exceptions. + // We do not handle AccessControlException here, since + // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate + // that the user is not in dfs.block.local-path-access.user, a condition + // which requires us to disable legacy SCR. + throw ioe; + } + LOG.warn(this + ": error creating legacy BlockReaderLocal. " + + "Disabling legacy local reads.", ioe); + clientContext.setDisableLegacyBlockReaderLocal(); + return null; + } + + private BlockReader getBlockReaderLocal() throws InvalidToken { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": trying to construct a BlockReaderLocal " + + "for short-circuit reads."); + } + if (pathInfo == null) { + pathInfo = clientContext.getDomainSocketFactory() + .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); + } + if (!pathInfo.getPathState().getUsableForShortCircuit()) { + PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " + + "giving up on BlockReaderLocal.", this, pathInfo); + return null; + } + ShortCircuitCache cache = clientContext.getShortCircuitCache(); + ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); + ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); + InvalidToken exc = info.getInvalidTokenException(); + if (exc != null) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": got InvalidToken exception while trying to " + + "construct BlockReaderLocal via " + pathInfo.getPath()); + } + throw exc; + } + if (info.getReplica() == null) { + PerformanceAdvisory.LOG.debug("{}: failed to get " + + "ShortCircuitReplica. Cannot construct " + + "BlockReaderLocal via {}", this, pathInfo.getPath()); + return null; + } + return new BlockReaderLocal.Builder(conf.getShortCircuitConf()). + setFilename(fileName). + setBlock(block). + setStartOffset(startOffset). + setShortCircuitReplica(info.getReplica()). + setVerifyChecksum(verifyChecksum). + setCachingStrategy(cachingStrategy). + setStorageType(storageType). + build(); + } + + /** + * Fetch a pair of short-circuit block descriptors from a local DataNode. + * + * @return Null if we could not communicate with the datanode, + * a new ShortCircuitReplicaInfo object otherwise. + * ShortCircuitReplicaInfo objects may contain either an InvalidToken + * exception, or a ShortCircuitReplica object ready to use. + */ + @Override + public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { + if (createShortCircuitReplicaInfoCallback != null) { + ShortCircuitReplicaInfo info = + createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo(); + if (info != null) return info; + } + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": trying to create ShortCircuitReplicaInfo."); + } + BlockReaderPeer curPeer; + while (true) { + curPeer = nextDomainPeer(); + if (curPeer == null) break; + if (curPeer.fromCache) remainingCacheTries--; + DomainPeer peer = (DomainPeer)curPeer.peer; + Slot slot = null; + ShortCircuitCache cache = clientContext.getShortCircuitCache(); + try { + MutableBoolean usedPeer = new MutableBoolean(false); + slot = cache.allocShmSlot(datanode, peer, usedPeer, + new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()), + clientName); + if (usedPeer.booleanValue()) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": allocShmSlot used up our previous socket " + + peer.getDomainSocket() + ". Allocating a new one..."); + } + curPeer = nextDomainPeer(); + if (curPeer == null) break; + peer = (DomainPeer)curPeer.peer; + } + ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot); + clientContext.getPeerCache().put(datanode, peer); + return info; + } catch (IOException e) { + if (slot != null) { + cache.freeSlot(slot); + } + if (curPeer.fromCache) { + // Handle an I/O error we got when using a cached socket. + // These are considered less serious, because the socket may be stale. + if (LOG.isDebugEnabled()) { + LOG.debug(this + ": closing stale domain peer " + peer, e); + } + IOUtilsClient.cleanup(LOG, peer); + } else { + // Handle an I/O error we got when using a newly created socket. + // We temporarily disable the domain socket path for a few minutes in + // this case, to prevent wasting more time on it. + LOG.warn(this + ": I/O error requesting file descriptors. " + + "Disabling domain socket " + peer.getDomainSocket(), e); + IOUtilsClient.cleanup(LOG, peer); + clientContext.getDomainSocketFactory() + .disableDomainSocketPath(pathInfo.getPath()); + return null; + } + } + } + return null; + } + + /** + * Request file descriptors from a DomainPeer. + * + * @param peer The peer to use for communication. + * @param slot If non-null, the shared memory slot to associate with the + * new ShortCircuitReplica. + * + * @return A ShortCircuitReplica object if we could communicate with the + * datanode; null, otherwise. + * @throws IOException If we encountered an I/O exception while communicating + * with the datanode. + */ + private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, + Slot slot) throws IOException { + ShortCircuitCache cache = clientContext.getShortCircuitCache(); + final DataOutputStream out = + new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); + SlotId slotId = slot == null ? null : slot.getSlotId(); + new Sender(out).requestShortCircuitFds(block, token, slotId, 1, + failureInjector.getSupportsReceiptVerification()); + DataInputStream in = new DataInputStream(peer.getInputStream()); + BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(in)); + DomainSocket sock = peer.getDomainSocket(); + failureInjector.injectRequestFileDescriptorsFailure(); + switch (resp.getStatus()) { + case SUCCESS: + byte buf[] = new byte[1]; + FileInputStream fis[] = new FileInputStream[2]; + sock.recvFileInputStreams(fis, buf, 0, buf.length); + ShortCircuitReplica replica = null; + try { + ExtendedBlockId key = + new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); + if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) { + LOG.trace("Sending receipt verification byte for slot " + slot); + sock.getOutputStream().write(0); + } + replica = new ShortCircuitReplica(key, fis[0], fis[1], cache, + Time.monotonicNow(), slot); + return new ShortCircuitReplicaInfo(replica); + } catch (IOException e) { + // This indicates an error reading from disk, or a format error. Since + // it's not a socket communication problem, we return null rather than + // throwing an exception. + LOG.warn(this + ": error creating ShortCircuitReplica.", e); + return null; + } finally { + if (replica == null) { + IOUtilsClient.cleanup(DFSClient.LOG, fis[0], fis[1]); + } + } + case ERROR_UNSUPPORTED: + if (!resp.hasShortCircuitAccessVersion()) { + LOG.warn("short-circuit read access is disabled for " + + "DataNode " + datanode + ". reason: " + resp.getMessage()); + clientContext.getDomainSocketFactory() + .disableShortCircuitForPath(pathInfo.getPath()); + } else { + LOG.warn("short-circuit read access for the file " + + fileName + " is disabled for DataNode " + datanode + + ". reason: " + resp.getMessage()); + } + return null; + case ERROR_ACCESS_TOKEN: + String msg = "access control error while " + + "attempting to set up short-circuit access to " + + fileName + resp.getMessage(); + if (LOG.isDebugEnabled()) { + LOG.debug(this + ":" + msg); + } + return new ShortCircuitReplicaInfo(new InvalidToken(msg)); + default: + LOG.warn(this + ": unknown response code " + resp.getStatus() + + " while attempting to set up short-circuit access. " + + resp.getMessage()); + clientContext.getDomainSocketFactory() + .disableShortCircuitForPath(pathInfo.getPath()); + return null; + } + } + + /** + * Get a RemoteBlockReader that communicates over a UNIX domain socket. + * + * @return The new BlockReader, or null if we failed to create the block + * reader. + * + * @throws InvalidToken If the block token was invalid. + * Potentially other security-related execptions. + */ + private BlockReader getRemoteBlockReaderFromDomain() throws IOException { + if (pathInfo == null) { + pathInfo = clientContext.getDomainSocketFactory() + .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); + } + if (!pathInfo.getPathState().getUsableForDataTransfer()) { + PerformanceAdvisory.LOG.debug("{}: not trying to create a " + + "remote block reader because the UNIX domain socket at {}" + + " is not usable.", this, pathInfo); + return null; + } + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": trying to create a remote block reader from the " + + "UNIX domain socket at " + pathInfo.getPath()); + } + + while (true) { + BlockReaderPeer curPeer = nextDomainPeer(); + if (curPeer == null) break; + if (curPeer.fromCache) remainingCacheTries--; + DomainPeer peer = (DomainPeer)curPeer.peer; + BlockReader blockReader = null; + try { + blockReader = getRemoteBlockReader(peer); + return blockReader; + } catch (IOException ioe) { + IOUtilsClient.cleanup(LOG, peer); + if (isSecurityException(ioe)) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": got security exception while constructing " + + "a remote block reader from the unix domain socket at " + + pathInfo.getPath(), ioe); + } + throw ioe; + } + if (curPeer.fromCache) { + // Handle an I/O error we got when using a cached peer. These are + // considered less serious, because the underlying socket may be stale. + if (LOG.isDebugEnabled()) { + LOG.debug("Closed potentially stale domain peer " + peer, ioe); + } + } else { + // Handle an I/O error we got when using a newly created domain peer. + // We temporarily disable the domain socket path for a few minutes in + // this case, to prevent wasting more time on it. + LOG.warn("I/O error constructing remote block reader. Disabling " + + "domain socket " + peer.getDomainSocket(), ioe); + clientContext.getDomainSocketFactory() + .disableDomainSocketPath(pathInfo.getPath()); + return null; + } + } finally { + if (blockReader == null) { + IOUtilsClient.cleanup(LOG, peer); + } + } + } + return null; + } + + /** + * Get a RemoteBlockReader that communicates over a TCP socket. + * + * @return The new BlockReader. We will not return null, but instead throw + * an exception if this fails. + * + * @throws InvalidToken + * If the block token was invalid. + * InvalidEncryptionKeyException + * If the encryption key was invalid. + * Other IOException + * If there was another problem. + */ + private BlockReader getRemoteBlockReaderFromTcp() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": trying to create a remote block reader from a " + + "TCP socket"); + } + BlockReader blockReader = null; + while (true) { + BlockReaderPeer curPeer = null; + Peer peer = null; + try { + curPeer = nextTcpPeer(); + if (curPeer.fromCache) remainingCacheTries--; + peer = curPeer.peer; + blockReader = getRemoteBlockReader(peer); + return blockReader; + } catch (IOException ioe) { + if (isSecurityException(ioe)) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": got security exception while constructing " + + "a remote block reader from " + peer, ioe); + } + throw ioe; + } + if ((curPeer != null) && curPeer.fromCache) { + // Handle an I/O error we got when using a cached peer. These are + // considered less serious, because the underlying socket may be + // stale. + if (LOG.isDebugEnabled()) { + LOG.debug("Closed potentially stale remote peer " + peer, ioe); + } + } else { + // Handle an I/O error we got when using a newly created peer. + LOG.warn("I/O error constructing remote block reader.", ioe); + throw ioe; + } + } finally { + if (blockReader == null) { + IOUtilsClient.cleanup(LOG, peer); + } + } + } + } + + public static class BlockReaderPeer { + final Peer peer; + final boolean fromCache; + + BlockReaderPeer(Peer peer, boolean fromCache) { + this.peer = peer; + this.fromCache = fromCache; + } + } + + /** + * Get the next DomainPeer-- either from the cache or by creating it. + * + * @return the next DomainPeer, or null if we could not construct one. + */ + private BlockReaderPeer nextDomainPeer() { + if (remainingCacheTries > 0) { + Peer peer = clientContext.getPeerCache().get(datanode, true); + if (peer != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("nextDomainPeer: reusing existing peer " + peer); + } + return new BlockReaderPeer(peer, true); + } + } + DomainSocket sock = clientContext.getDomainSocketFactory(). + createSocket(pathInfo, conf.getSocketTimeout()); + if (sock == null) return null; + return new BlockReaderPeer(new DomainPeer(sock), false); + } + + /** + * Get the next TCP-based peer-- either from the cache or by creating it. + * + * @return the next Peer, or null if we could not construct one. + * + * @throws IOException If there was an error while constructing the peer + * (such as an InvalidEncryptionKeyException) + */ + private BlockReaderPeer nextTcpPeer() throws IOException { + if (remainingCacheTries > 0) { + Peer peer = clientContext.getPeerCache().get(datanode, false); + if (peer != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("nextTcpPeer: reusing existing peer " + peer); + } + return new BlockReaderPeer(peer, true); + } + } + try { + Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token, + datanode); + if (LOG.isTraceEnabled()) { + LOG.trace("nextTcpPeer: created newConnectedPeer " + peer); + } + return new BlockReaderPeer(peer, false); + } catch (IOException e) { + if (LOG.isTraceEnabled()) { + LOG.trace("nextTcpPeer: failed to create newConnectedPeer " + + "connected to " + datanode); + } + throw e; + } + } + + /** + * Determine if an exception is security-related. + * + * We need to handle these exceptions differently than other IOExceptions. + * They don't indicate a communication problem. Instead, they mean that there + * is some action the client needs to take, such as refetching block tokens, + * renewing encryption keys, etc. + * + * @param ioe The exception + * @return True only if the exception is security-related. + */ + private static boolean isSecurityException(IOException ioe) { + return (ioe instanceof InvalidToken) || + (ioe instanceof InvalidEncryptionKeyException) || + (ioe instanceof InvalidBlockTokenException) || + (ioe instanceof AccessControlException); + } + + @SuppressWarnings("deprecation") + private BlockReader getRemoteBlockReader(Peer peer) throws IOException { + if (conf.getShortCircuitConf().isUseLegacyBlockReader()) { + return RemoteBlockReader.newBlockReader(fileName, + block, token, startOffset, length, conf.getIoBufferSize(), + verifyChecksum, clientName, peer, datanode, + clientContext.getPeerCache(), cachingStrategy); + } else { + return RemoteBlockReader2.newBlockReader( + fileName, block, token, startOffset, length, + verifyChecksum, clientName, peer, datanode, + clientContext.getPeerCache(), cachingStrategy); + } + } + + @Override + public String toString() { + return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")"; + } + + /** + * File name to print when accessing a block directly (from servlets) + * @param s Address of the block location + * @param poolId Block pool ID of the block + * @param blockId Block ID of the block + * @return string that has a file name for debug purposes + */ + public static String getFileName(final InetSocketAddress s, + final String poolId, final long blockId) { + return s.toString() + ":" + poolId + ":" + blockId; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java new file mode 100644 index 0000000..807ede8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java @@ -0,0 +1,369 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.BlockStorageLocation; +import org.apache.hadoop.fs.HdfsVolumeId; +import org.apache.hadoop.fs.VolumeId; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.token.Token; +import org.apache.htrace.Span; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + [email protected] [email protected] +class BlockStorageLocationUtil { + + static final Logger LOG = LoggerFactory + .getLogger(BlockStorageLocationUtil.class); + + /** + * Create a list of {@link VolumeBlockLocationCallable} corresponding to a set + * of datanodes and blocks. The blocks must all correspond to the same + * block pool. + * + * @param datanodeBlocks + * Map of datanodes to block replicas at each datanode + * @return callables Used to query each datanode for location information on + * the block replicas at the datanode + */ + private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables( + Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks, + int timeout, boolean connectToDnViaHostname, Span parent) { + + if (datanodeBlocks.isEmpty()) { + return Lists.newArrayList(); + } + + // Construct the callables, one per datanode + List<VolumeBlockLocationCallable> callables = + new ArrayList<VolumeBlockLocationCallable>(); + for (Map.Entry<DatanodeInfo, List<LocatedBlock>> entry : datanodeBlocks + .entrySet()) { + // Construct RPC parameters + DatanodeInfo datanode = entry.getKey(); + List<LocatedBlock> locatedBlocks = entry.getValue(); + if (locatedBlocks.isEmpty()) { + continue; + } + + // Ensure that the blocks all are from the same block pool. + String poolId = locatedBlocks.get(0).getBlock().getBlockPoolId(); + for (LocatedBlock lb : locatedBlocks) { + if (!poolId.equals(lb.getBlock().getBlockPoolId())) { + throw new IllegalArgumentException( + "All blocks to be queried must be in the same block pool: " + + locatedBlocks.get(0).getBlock() + " and " + lb + + " are from different pools."); + } + } + + long[] blockIds = new long[locatedBlocks.size()]; + int i = 0; + List<Token<BlockTokenIdentifier>> dnTokens = + new ArrayList<Token<BlockTokenIdentifier>>( + locatedBlocks.size()); + for (LocatedBlock b : locatedBlocks) { + blockIds[i++] = b.getBlock().getBlockId(); + dnTokens.add(b.getBlockToken()); + } + VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable( + conf, datanode, poolId, blockIds, dnTokens, timeout, + connectToDnViaHostname, parent); + callables.add(callable); + } + return callables; + } + + /** + * Queries datanodes for the blocks specified in <code>datanodeBlocks</code>, + * making one RPC to each datanode. These RPCs are made in parallel using a + * threadpool. + * + * @param datanodeBlocks + * Map of datanodes to the blocks present on the DN + * @return metadatas Map of datanodes to block metadata of the DN + * @throws InvalidBlockTokenException + * if client does not have read access on a requested block + */ + static Map<DatanodeInfo, HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata( + Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks, + int poolsize, int timeoutMs, boolean connectToDnViaHostname) + throws InvalidBlockTokenException { + + List<VolumeBlockLocationCallable> callables = + createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs, + connectToDnViaHostname, Trace.currentSpan()); + + // Use a thread pool to execute the Callables in parallel + List<Future<HdfsBlocksMetadata>> futures = + new ArrayList<Future<HdfsBlocksMetadata>>(); + ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize); + try { + futures = executor.invokeAll(callables, timeoutMs, + TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // Swallow the exception here, because we can return partial results + } + executor.shutdown(); + + Map<DatanodeInfo, HdfsBlocksMetadata> metadatas = + Maps.newHashMapWithExpectedSize(datanodeBlocks.size()); + // Fill in metadatas with results from DN RPCs, where possible + for (int i = 0; i < futures.size(); i++) { + VolumeBlockLocationCallable callable = callables.get(i); + DatanodeInfo datanode = callable.getDatanodeInfo(); + Future<HdfsBlocksMetadata> future = futures.get(i); + try { + HdfsBlocksMetadata metadata = future.get(); + metadatas.put(callable.getDatanodeInfo(), metadata); + } catch (CancellationException e) { + LOG.info("Cancelled while waiting for datanode " + + datanode.getIpcAddr(false) + ": " + e.toString()); + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof InvalidBlockTokenException) { + LOG.warn("Invalid access token when trying to retrieve " + + "information from datanode " + datanode.getIpcAddr(false)); + throw (InvalidBlockTokenException) t; + } + else if (t instanceof UnsupportedOperationException) { + LOG.info("Datanode " + datanode.getIpcAddr(false) + " does not support" + + " required #getHdfsBlocksMetadata() API"); + throw (UnsupportedOperationException) t; + } else { + LOG.info("Failed to query block locations on datanode " + + datanode.getIpcAddr(false) + ": " + t); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Could not fetch information from datanode", t); + } + } catch (InterruptedException e) { + // Shouldn't happen, because invokeAll waits for all Futures to be ready + LOG.info("Interrupted while fetching HdfsBlocksMetadata"); + } + } + + return metadatas; + } + + /** + * Group the per-replica {@link VolumeId} info returned from + * {@link DFSClient#queryDatanodesForHdfsBlocksMetadata(Map)} to be + * associated + * with the corresponding {@link LocatedBlock}. + * + * @param blocks + * Original LocatedBlock array + * @param metadatas + * VolumeId information for the replicas on each datanode + * @return blockVolumeIds per-replica VolumeId information associated with the + * parent LocatedBlock + */ + static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks( + List<LocatedBlock> blocks, + Map<DatanodeInfo, HdfsBlocksMetadata> metadatas) { + + // Initialize mapping of ExtendedBlock to LocatedBlock. + // Used to associate results from DN RPCs to the parent LocatedBlock + Map<Long, LocatedBlock> blockIdToLocBlock = + new HashMap<Long, LocatedBlock>(); + for (LocatedBlock b : blocks) { + blockIdToLocBlock.put(b.getBlock().getBlockId(), b); + } + + // Initialize the mapping of blocks -> list of VolumeIds, one per replica + // This is filled out with real values from the DN RPCs + Map<LocatedBlock, List<VolumeId>> blockVolumeIds = + new HashMap<LocatedBlock, List<VolumeId>>(); + for (LocatedBlock b : blocks) { + ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length); + for (int i = 0; i < b.getLocations().length; i++) { + l.add(null); + } + blockVolumeIds.put(b, l); + } + + // Iterate through the list of metadatas (one per datanode). + // For each metadata, if it's valid, insert its volume location information + // into the Map returned to the caller + for (Map.Entry<DatanodeInfo, HdfsBlocksMetadata> entry : metadatas.entrySet()) { + DatanodeInfo datanode = entry.getKey(); + HdfsBlocksMetadata metadata = entry.getValue(); + // Check if metadata is valid + if (metadata == null) { + continue; + } + long[] metaBlockIds = metadata.getBlockIds(); + List<byte[]> metaVolumeIds = metadata.getVolumeIds(); + List<Integer> metaVolumeIndexes = metadata.getVolumeIndexes(); + // Add VolumeId for each replica in the HdfsBlocksMetadata + for (int j = 0; j < metaBlockIds.length; j++) { + int volumeIndex = metaVolumeIndexes.get(j); + long blockId = metaBlockIds[j]; + // Skip if block wasn't found, or not a valid index into metaVolumeIds + // Also skip if the DN responded with a block we didn't ask for + if (volumeIndex == Integer.MAX_VALUE + || volumeIndex >= metaVolumeIds.size() + || !blockIdToLocBlock.containsKey(blockId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("No data for block " + blockId); + } + continue; + } + // Get the VolumeId by indexing into the list of VolumeIds + // provided by the datanode + byte[] volumeId = metaVolumeIds.get(volumeIndex); + HdfsVolumeId id = new HdfsVolumeId(volumeId); + // Find out which index we are in the LocatedBlock's replicas + LocatedBlock locBlock = blockIdToLocBlock.get(blockId); + DatanodeInfo[] dnInfos = locBlock.getLocations(); + int index = -1; + for (int k = 0; k < dnInfos.length; k++) { + if (dnInfos[k].equals(datanode)) { + index = k; + break; + } + } + if (index < 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Datanode responded with a block volume id we did" + + " not request, omitting."); + } + continue; + } + // Place VolumeId at the same index as the DN's index in the list of + // replicas + List<VolumeId> volumeIds = blockVolumeIds.get(locBlock); + volumeIds.set(index, id); + } + } + return blockVolumeIds; + } + + /** + * Helper method to combine a list of {@link LocatedBlock} with associated + * {@link VolumeId} information to form a list of {@link BlockStorageLocation} + * . + */ + static BlockStorageLocation[] convertToVolumeBlockLocations( + List<LocatedBlock> blocks, + Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException { + // Construct the final return value of VolumeBlockLocation[] + BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks); + List<BlockStorageLocation> volumeBlockLocs = + new ArrayList<BlockStorageLocation>(locations.length); + for (int i = 0; i < locations.length; i++) { + LocatedBlock locBlock = blocks.get(i); + List<VolumeId> volumeIds = blockVolumeIds.get(locBlock); + BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i], + volumeIds.toArray(new VolumeId[0])); + volumeBlockLocs.add(bsLoc); + } + return volumeBlockLocs.toArray(new BlockStorageLocation[] {}); + } + + /** + * Callable that sets up an RPC proxy to a datanode and queries it for + * volume location information for a list of ExtendedBlocks. + */ + private static class VolumeBlockLocationCallable implements + Callable<HdfsBlocksMetadata> { + + private final Configuration configuration; + private final int timeout; + private final DatanodeInfo datanode; + private final String poolId; + private final long[] blockIds; + private final List<Token<BlockTokenIdentifier>> dnTokens; + private final boolean connectToDnViaHostname; + private final Span parentSpan; + + VolumeBlockLocationCallable(Configuration configuration, + DatanodeInfo datanode, String poolId, long []blockIds, + List<Token<BlockTokenIdentifier>> dnTokens, int timeout, + boolean connectToDnViaHostname, Span parentSpan) { + this.configuration = configuration; + this.timeout = timeout; + this.datanode = datanode; + this.poolId = poolId; + this.blockIds = blockIds; + this.dnTokens = dnTokens; + this.connectToDnViaHostname = connectToDnViaHostname; + this.parentSpan = parentSpan; + } + + public DatanodeInfo getDatanodeInfo() { + return datanode; + } + + @Override + public HdfsBlocksMetadata call() throws Exception { + HdfsBlocksMetadata metadata = null; + // Create the RPC proxy and make the RPC + ClientDatanodeProtocol cdp = null; + TraceScope scope = + Trace.startSpan("getHdfsBlocksMetadata", parentSpan); + try { + cdp = DFSUtilClient.createClientDatanodeProtocolProxy( + datanode, configuration, + timeout, connectToDnViaHostname); + metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens); + } catch (IOException e) { + // Bubble this up to the caller, handle with the Future + throw e; + } finally { + scope.close(); + if (cdp != null) { + RPC.stopProxy(cdp); + } + } + return metadata; + } + } +}
