http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java deleted file mode 100644 index 3b09953..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ /dev/null @@ -1,3200 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT; - -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.Proxy; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.net.URI; -import java.net.UnknownHostException; -import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.net.SocketFactory; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.HadoopIllegalArgumentException; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.crypto.CipherSuite; -import org.apache.hadoop.crypto.CryptoCodec; -import org.apache.hadoop.crypto.CryptoInputStream; -import org.apache.hadoop.crypto.CryptoOutputStream; -import org.apache.hadoop.crypto.CryptoProtocolVersion; -import org.apache.hadoop.crypto.key.KeyProvider; -import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; -import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; -import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.BlockStorageLocation; -import org.apache.hadoop.fs.CacheFlag; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileEncryptionInfo; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FsServerDefaults; -import org.apache.hadoop.fs.FsStatus; -import org.apache.hadoop.fs.HdfsBlockLocation; -import org.apache.hadoop.fs.InvalidPathException; -import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum; -import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; -import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum; -import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.Options.ChecksumOpt; -import org.apache.hadoop.fs.ParentNotDirectoryException; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.fs.UnresolvedLinkException; -import org.apache.hadoop.fs.VolumeId; -import org.apache.hadoop.fs.XAttr; -import org.apache.hadoop.fs.XAttrSetFlag; -import org.apache.hadoop.fs.permission.AclEntry; -import org.apache.hadoop.fs.permission.AclStatus; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.client.HdfsDataInputStream; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf; -import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.AclException; -import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; -import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; -import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; -import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator; -import org.apache.hadoop.hdfs.protocol.CachePoolEntry; -import org.apache.hadoop.hdfs.protocol.CachePoolInfo; -import org.apache.hadoop.hdfs.protocol.CachePoolIterator; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; -import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.DirectoryListing; -import org.apache.hadoop.hdfs.protocol.EncryptionZone; -import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; -import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; -import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; -import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; -import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; -import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; -import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.hdfs.protocol.datatransfer.Op; -import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; -import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; -import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; -import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.server.namenode.SafeModeException; -import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.EnumSetWritable; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.MD5Hash; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.ipc.RpcInvocationHandler; -import org.apache.hadoop.net.DNS; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenRenewer; -import org.apache.hadoop.tracing.SpanReceiverHost; -import org.apache.hadoop.tracing.TraceUtils; -import org.apache.hadoop.util.Daemon; -import org.apache.hadoop.util.DataChecksum; -import org.apache.hadoop.util.DataChecksum.Type; -import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.Time; -import org.apache.htrace.Sampler; -import org.apache.htrace.SamplerBuilder; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.net.InetAddresses; - -/******************************************************** - * DFSClient can connect to a Hadoop Filesystem and - * perform basic file tasks. It uses the ClientProtocol - * to communicate with a NameNode daemon, and connects - * directly to DataNodes to read/write block data. - * - * Hadoop DFS users should obtain an instance of - * DistributedFileSystem, which uses DFSClient to handle - * filesystem tasks. - * - ********************************************************/ [email protected] -public class DFSClient implements java.io.Closeable, RemotePeerFactory, - DataEncryptionKeyFactory { - public static final Log LOG = LogFactory.getLog(DFSClient.class); - public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour - - private final Configuration conf; - private final DfsClientConf dfsClientConf; - final ClientProtocol namenode; - /* The service used for delegation tokens */ - private Text dtService; - - final UserGroupInformation ugi; - volatile boolean clientRunning = true; - volatile long lastLeaseRenewal; - private volatile FsServerDefaults serverDefaults; - private volatile long serverDefaultsLastUpdate; - final String clientName; - final SocketFactory socketFactory; - final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure; - final FileSystem.Statistics stats; - private final String authority; - private final Random r = new Random(); - private SocketAddress[] localInterfaceAddrs; - private DataEncryptionKey encryptionKey; - final SaslDataTransferClient saslClient; - private final CachingStrategy defaultReadCachingStrategy; - private final CachingStrategy defaultWriteCachingStrategy; - private final ClientContext clientContext; - - private static final DFSHedgedReadMetrics HEDGED_READ_METRIC = - new DFSHedgedReadMetrics(); - private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; - private final Sampler<?> traceSampler; - private final int smallBufferSize; - - public DfsClientConf getConf() { - return dfsClientConf; - } - - Configuration getConfiguration() { - return conf; - } - - /** - * A map from file names to {@link DFSOutputStream} objects - * that are currently being written by this client. - * Note that a file can only be written by a single client. - */ - private final Map<Long, DFSOutputStream> filesBeingWritten - = new HashMap<Long, DFSOutputStream>(); - - /** - * Same as this(NameNode.getNNAddress(conf), conf); - * @see #DFSClient(InetSocketAddress, Configuration) - * @deprecated Deprecated at 0.21 - */ - @Deprecated - public DFSClient(Configuration conf) throws IOException { - this(DFSUtilClient.getNNAddress(conf), conf); - } - - public DFSClient(InetSocketAddress address, Configuration conf) throws IOException { - this(DFSUtilClient.getNNUri(address), conf); - } - - /** - * Same as this(nameNodeUri, conf, null); - * @see #DFSClient(URI, Configuration, FileSystem.Statistics) - */ - public DFSClient(URI nameNodeUri, Configuration conf - ) throws IOException { - this(nameNodeUri, conf, null); - } - - /** - * Same as this(nameNodeUri, null, conf, stats); - * @see #DFSClient(URI, ClientProtocol, Configuration, FileSystem.Statistics) - */ - public DFSClient(URI nameNodeUri, Configuration conf, - FileSystem.Statistics stats) - throws IOException { - this(nameNodeUri, null, conf, stats); - } - - /** - * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode. - * If HA is enabled and a positive value is set for - * {@link HdfsClientConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY} - * in the configuration, the DFSClient will use - * {@link LossyRetryInvocationHandler} as its RetryInvocationHandler. - * Otherwise one of nameNodeUri or rpcNamenode must be null. - */ - @VisibleForTesting - public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, - Configuration conf, FileSystem.Statistics stats) - throws IOException { - SpanReceiverHost.get(conf, HdfsClientConfigKeys.DFS_CLIENT_HTRACE_PREFIX); - traceSampler = new SamplerBuilder(TraceUtils. - wrapHadoopConf(HdfsClientConfigKeys.DFS_CLIENT_HTRACE_PREFIX, conf)) - .build(); - // Copy only the required DFSClient configuration - this.dfsClientConf = new DfsClientConf(conf); - this.conf = conf; - this.stats = stats; - this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); - this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); - this.smallBufferSize = DFSUtil.getSmallBufferSize(conf); - - this.ugi = UserGroupInformation.getCurrentUser(); - - this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority(); - this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" + - ThreadLocalRandom.current().nextInt() + "_" + - Thread.currentThread().getId(); - int numResponseToDrop = conf.getInt( - HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, - HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT); - ProxyAndInfo<ClientProtocol> proxyInfo = null; - AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false); - if (numResponseToDrop > 0) { - // This case is used for testing. - LOG.warn(HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY - + " is set to " + numResponseToDrop - + ", this hacked client will proactively drop responses"); - proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf, - nameNodeUri, ClientProtocol.class, numResponseToDrop, - nnFallbackToSimpleAuth); - } - - if (proxyInfo != null) { - this.dtService = proxyInfo.getDelegationTokenService(); - this.namenode = proxyInfo.getProxy(); - } else if (rpcNamenode != null) { - // This case is used for testing. - Preconditions.checkArgument(nameNodeUri == null); - this.namenode = rpcNamenode; - dtService = null; - } else { - Preconditions.checkArgument(nameNodeUri != null, - "null URI"); - proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf, - nameNodeUri, nnFallbackToSimpleAuth); - this.dtService = proxyInfo.getDelegationTokenService(); - this.namenode = proxyInfo.getProxy(); - } - - String localInterfaces[] = - conf.getTrimmedStrings(HdfsClientConfigKeys.DFS_CLIENT_LOCAL_INTERFACES); - localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces); - if (LOG.isDebugEnabled() && 0 != localInterfaces.length) { - LOG.debug("Using local interfaces [" + - Joiner.on(',').join(localInterfaces)+ "] with addresses [" + - Joiner.on(',').join(localInterfaceAddrs) + "]"); - } - - Boolean readDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ? - null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false); - Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ? - null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0); - Boolean writeDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ? - null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false); - this.defaultReadCachingStrategy = - new CachingStrategy(readDropBehind, readahead); - this.defaultWriteCachingStrategy = - new CachingStrategy(writeDropBehind, readahead); - this.clientContext = ClientContext.get( - conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT), - dfsClientConf); - - if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) { - this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize()); - } - this.saslClient = new SaslDataTransferClient( - conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), - TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); - } - - /** - * Return the socket addresses to use with each configured - * local interface. Local interfaces may be specified by IP - * address, IP address range using CIDR notation, interface - * name (e.g. eth0) or sub-interface name (e.g. eth0:0). - * The socket addresses consist of the IPs for the interfaces - * and the ephemeral port (port 0). If an IP, IP range, or - * interface name matches an interface with sub-interfaces - * only the IP of the interface is used. Sub-interfaces can - * be used by specifying them explicitly (by IP or name). - * - * @return SocketAddresses for the configured local interfaces, - * or an empty array if none are configured - * @throws UnknownHostException if a given interface name is invalid - */ - private static SocketAddress[] getLocalInterfaceAddrs( - String interfaceNames[]) throws UnknownHostException { - List<SocketAddress> localAddrs = new ArrayList<SocketAddress>(); - for (String interfaceName : interfaceNames) { - if (InetAddresses.isInetAddress(interfaceName)) { - localAddrs.add(new InetSocketAddress(interfaceName, 0)); - } else if (NetUtils.isValidSubnet(interfaceName)) { - for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) { - localAddrs.add(new InetSocketAddress(addr, 0)); - } - } else { - for (String ip : DNS.getIPs(interfaceName, false)) { - localAddrs.add(new InetSocketAddress(ip, 0)); - } - } - } - return localAddrs.toArray(new SocketAddress[localAddrs.size()]); - } - - /** - * Select one of the configured local interfaces at random. We use a random - * interface because other policies like round-robin are less effective - * given that we cache connections to datanodes. - * - * @return one of the local interface addresses at random, or null if no - * local interfaces are configured - */ - SocketAddress getRandomLocalInterfaceAddr() { - if (localInterfaceAddrs.length == 0) { - return null; - } - final int idx = r.nextInt(localInterfaceAddrs.length); - final SocketAddress addr = localInterfaceAddrs[idx]; - if (LOG.isDebugEnabled()) { - LOG.debug("Using local interface " + addr); - } - return addr; - } - - /** - * Return the timeout that clients should use when writing to datanodes. - * @param numNodes the number of nodes in the pipeline. - */ - int getDatanodeWriteTimeout(int numNodes) { - final int t = dfsClientConf.getDatanodeSocketWriteTimeout(); - return t > 0? t + HdfsConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0; - } - - int getDatanodeReadTimeout(int numNodes) { - final int t = dfsClientConf.getSocketTimeout(); - return t > 0? HdfsConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0; - } - - @VisibleForTesting - public String getClientName() { - return clientName; - } - - void checkOpen() throws IOException { - if (!clientRunning) { - IOException result = new IOException("Filesystem closed"); - throw result; - } - } - - /** Return the lease renewer instance. The renewer thread won't start - * until the first output stream is created. The same instance will - * be returned until all output streams are closed. - */ - public LeaseRenewer getLeaseRenewer() throws IOException { - return LeaseRenewer.getInstance(authority, ugi, this); - } - - /** Get a lease and start automatic renewal */ - private void beginFileLease(final long inodeId, final DFSOutputStream out) - throws IOException { - getLeaseRenewer().put(inodeId, out, this); - } - - /** Stop renewal of lease for the file. */ - void endFileLease(final long inodeId) throws IOException { - getLeaseRenewer().closeFile(inodeId, this); - } - - - /** Put a file. Only called from LeaseRenewer, where proper locking is - * enforced to consistently update its local dfsclients array and - * client's filesBeingWritten map. - */ - public void putFileBeingWritten(final long inodeId, final DFSOutputStream out) { - synchronized(filesBeingWritten) { - filesBeingWritten.put(inodeId, out); - // update the last lease renewal time only when there was no - // writes. once there is one write stream open, the lease renewer - // thread keeps it updated well with in anyone's expiration time. - if (lastLeaseRenewal == 0) { - updateLastLeaseRenewal(); - } - } - } - - /** Remove a file. Only called from LeaseRenewer. */ - public void removeFileBeingWritten(final long inodeId) { - synchronized(filesBeingWritten) { - filesBeingWritten.remove(inodeId); - if (filesBeingWritten.isEmpty()) { - lastLeaseRenewal = 0; - } - } - } - - /** Is file-being-written map empty? */ - public boolean isFilesBeingWrittenEmpty() { - synchronized(filesBeingWritten) { - return filesBeingWritten.isEmpty(); - } - } - - /** @return true if the client is running */ - public boolean isClientRunning() { - return clientRunning; - } - - long getLastLeaseRenewal() { - return lastLeaseRenewal; - } - - void updateLastLeaseRenewal() { - synchronized(filesBeingWritten) { - if (filesBeingWritten.isEmpty()) { - return; - } - lastLeaseRenewal = Time.monotonicNow(); - } - } - - /** - * Renew leases. - * @return true if lease was renewed. May return false if this - * client has been closed or has no files open. - **/ - public boolean renewLease() throws IOException { - if (clientRunning && !isFilesBeingWrittenEmpty()) { - try { - namenode.renewLease(clientName); - updateLastLeaseRenewal(); - return true; - } catch (IOException e) { - // Abort if the lease has already expired. - final long elapsed = Time.monotonicNow() - getLastLeaseRenewal(); - if (elapsed > HdfsConstants.LEASE_HARDLIMIT_PERIOD) { - LOG.warn("Failed to renew lease for " + clientName + " for " - + (elapsed/1000) + " seconds (>= hard-limit =" - + (HdfsConstants.LEASE_HARDLIMIT_PERIOD / 1000) + " seconds.) " - + "Closing all files being written ...", e); - closeAllFilesBeingWritten(true); - } else { - // Let the lease renewer handle it and retry. - throw e; - } - } - } - return false; - } - - /** - * Close connections the Namenode. - */ - void closeConnectionToNamenode() { - RPC.stopProxy(namenode); - } - - /** Close/abort all files being written. */ - public void closeAllFilesBeingWritten(final boolean abort) { - for(;;) { - final long inodeId; - final DFSOutputStream out; - synchronized(filesBeingWritten) { - if (filesBeingWritten.isEmpty()) { - return; - } - inodeId = filesBeingWritten.keySet().iterator().next(); - out = filesBeingWritten.remove(inodeId); - } - if (out != null) { - try { - if (abort) { - out.abort(); - } else { - out.close(); - } - } catch(IOException ie) { - LOG.error("Failed to " + (abort ? "abort" : "close") + " file: " - + out.getSrc() + " with inode: " + inodeId, ie); - } - } - } - } - - /** - * Close the file system, abandoning all of the leases and files being - * created and close connections to the namenode. - */ - @Override - public synchronized void close() throws IOException { - if(clientRunning) { - closeAllFilesBeingWritten(false); - clientRunning = false; - getLeaseRenewer().closeClient(this); - // close connections to the namenode - closeConnectionToNamenode(); - } - } - - /** - * Close all open streams, abandoning all of the leases and files being - * created. - * @param abort whether streams should be gracefully closed - */ - public void closeOutputStreams(boolean abort) { - if (clientRunning) { - closeAllFilesBeingWritten(abort); - } - } - - /** - * @see ClientProtocol#getPreferredBlockSize(String) - */ - public long getBlockSize(String f) throws IOException { - TraceScope scope = getPathTraceScope("getBlockSize", f); - try { - return namenode.getPreferredBlockSize(f); - } catch (IOException ie) { - LOG.warn("Problem getting block size", ie); - throw ie; - } finally { - scope.close(); - } - } - - /** - * Get server default values for a number of configuration params. - * @see ClientProtocol#getServerDefaults() - */ - public FsServerDefaults getServerDefaults() throws IOException { - long now = Time.monotonicNow(); - if ((serverDefaults == null) || - (now - serverDefaultsLastUpdate > SERVER_DEFAULTS_VALIDITY_PERIOD)) { - serverDefaults = namenode.getServerDefaults(); - serverDefaultsLastUpdate = now; - } - assert serverDefaults != null; - return serverDefaults; - } - - /** - * Get a canonical token service name for this client's tokens. Null should - * be returned if the client is not using tokens. - * @return the token service for the client - */ - @InterfaceAudience.LimitedPrivate( { "HDFS" }) - public String getCanonicalServiceName() { - return (dtService != null) ? dtService.toString() : null; - } - - /** - * @see ClientProtocol#getDelegationToken(Text) - */ - public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) - throws IOException { - assert dtService != null; - TraceScope scope = Trace.startSpan("getDelegationToken", traceSampler); - try { - Token<DelegationTokenIdentifier> token = - namenode.getDelegationToken(renewer); - if (token != null) { - token.setService(this.dtService); - LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token)); - } else { - LOG.info("Cannot get delegation token from " + renewer); - } - return token; - } finally { - scope.close(); - } - } - - /** - * Renew a delegation token - * @param token the token to renew - * @return the new expiration time - * @throws InvalidToken - * @throws IOException - * @deprecated Use Token.renew instead. - */ - @Deprecated - public long renewDelegationToken(Token<DelegationTokenIdentifier> token) - throws InvalidToken, IOException { - LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token)); - try { - return token.renew(conf); - } catch (InterruptedException ie) { - throw new RuntimeException("caught interrupted", ie); - } catch (RemoteException re) { - throw re.unwrapRemoteException(InvalidToken.class, - AccessControlException.class); - } - } - - /** - * Cancel a delegation token - * @param token the token to cancel - * @throws InvalidToken - * @throws IOException - * @deprecated Use Token.cancel instead. - */ - @Deprecated - public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) - throws InvalidToken, IOException { - LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token)); - try { - token.cancel(conf); - } catch (InterruptedException ie) { - throw new RuntimeException("caught interrupted", ie); - } catch (RemoteException re) { - throw re.unwrapRemoteException(InvalidToken.class, - AccessControlException.class); - } - } - - @InterfaceAudience.Private - public static class Renewer extends TokenRenewer { - - static { - //Ensure that HDFS Configuration files are loaded before trying to use - // the renewer. - HdfsConfiguration.init(); - } - - @Override - public boolean handleKind(Text kind) { - return DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind); - } - - @SuppressWarnings("unchecked") - @Override - public long renew(Token<?> token, Configuration conf) throws IOException { - Token<DelegationTokenIdentifier> delToken = - (Token<DelegationTokenIdentifier>) token; - ClientProtocol nn = getNNProxy(delToken, conf); - try { - return nn.renewDelegationToken(delToken); - } catch (RemoteException re) { - throw re.unwrapRemoteException(InvalidToken.class, - AccessControlException.class); - } - } - - @SuppressWarnings("unchecked") - @Override - public void cancel(Token<?> token, Configuration conf) throws IOException { - Token<DelegationTokenIdentifier> delToken = - (Token<DelegationTokenIdentifier>) token; - LOG.info("Cancelling " + - DelegationTokenIdentifier.stringifyToken(delToken)); - ClientProtocol nn = getNNProxy(delToken, conf); - try { - nn.cancelDelegationToken(delToken); - } catch (RemoteException re) { - throw re.unwrapRemoteException(InvalidToken.class, - AccessControlException.class); - } - } - - private static ClientProtocol getNNProxy( - Token<DelegationTokenIdentifier> token, Configuration conf) - throws IOException { - URI uri = HAUtilClient.getServiceUriFromToken( - HdfsConstants.HDFS_URI_SCHEME, token); - if (HAUtilClient.isTokenForLogicalUri(token) && - !HAUtilClient.isLogicalUri(conf, uri)) { - // If the token is for a logical nameservice, but the configuration - // we have disagrees about that, we can't actually renew it. - // This can be the case in MR, for example, if the RM doesn't - // have all of the HA clusters configured in its configuration. - throw new IOException("Unable to map logical nameservice URI '" + - uri + "' to a NameNode. Local configuration does not have " + - "a failover proxy provider configured."); - } - - ProxyAndInfo<ClientProtocol> info = - NameNodeProxiesClient.createProxyWithClientProtocol(conf, uri, null); - assert info.getDelegationTokenService().equals(token.getService()) : - "Returned service '" + info.getDelegationTokenService().toString() + - "' doesn't match expected service '" + - token.getService().toString() + "'"; - - return info.getProxy(); - } - - @Override - public boolean isManaged(Token<?> token) throws IOException { - return true; - } - - } - - /** - * Report corrupt blocks that were discovered by the client. - * @see ClientProtocol#reportBadBlocks(LocatedBlock[]) - */ - public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { - namenode.reportBadBlocks(blocks); - } - - public LocatedBlocks getLocatedBlocks(String src, long start) - throws IOException { - return getLocatedBlocks(src, start, dfsClientConf.getPrefetchSize()); - } - - /* - * This is just a wrapper around callGetBlockLocations, but non-static so that - * we can stub it out for tests. - */ - @VisibleForTesting - public LocatedBlocks getLocatedBlocks(String src, long start, long length) - throws IOException { - TraceScope scope = getPathTraceScope("getBlockLocations", src); - try { - return callGetBlockLocations(namenode, src, start, length); - } finally { - scope.close(); - } - } - - /** - * @see ClientProtocol#getBlockLocations(String, long, long) - */ - static LocatedBlocks callGetBlockLocations(ClientProtocol namenode, - String src, long start, long length) - throws IOException { - try { - return namenode.getBlockLocations(src, start, length); - } catch(RemoteException re) { - throw re.unwrapRemoteException(AccessControlException.class, - FileNotFoundException.class, - UnresolvedPathException.class); - } - } - - /** - * Recover a file's lease - * @param src a file's path - * @return true if the file is already closed - * @throws IOException - */ - boolean recoverLease(String src) throws IOException { - checkOpen(); - - TraceScope scope = getPathTraceScope("recoverLease", src); - try { - return namenode.recoverLease(src, clientName); - } catch (RemoteException re) { - throw re.unwrapRemoteException(FileNotFoundException.class, - AccessControlException.class, - UnresolvedPathException.class); - } finally { - scope.close(); - } - } - - /** - * Get block location info about file - * - * getBlockLocations() returns a list of hostnames that store - * data for a specific file region. It returns a set of hostnames - * for every block within the indicated region. - * - * This function is very useful when writing code that considers - * data-placement when performing operations. For example, the - * MapReduce system tries to schedule tasks on the same machines - * as the data-block the task processes. - */ - public BlockLocation[] getBlockLocations(String src, long start, - long length) throws IOException, UnresolvedLinkException { - TraceScope scope = getPathTraceScope("getBlockLocations", src); - try { - LocatedBlocks blocks = getLocatedBlocks(src, start, length); - BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks); - HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length]; - for (int i = 0; i < locations.length; i++) { - hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i)); - } - return hdfsLocations; - } finally { - scope.close(); - } - } - - /** - * Get block location information about a list of {@link HdfsBlockLocation}. - * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to - * get {@link BlockStorageLocation}s for blocks returned by - * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)} - * . - * - * This is done by making a round of RPCs to the associated datanodes, asking - * the volume of each block replica. The returned array of - * {@link BlockStorageLocation} expose this information as a - * {@link VolumeId}. - * - * @param blockLocations - * target blocks on which to query volume location information - * @return volumeBlockLocations original block array augmented with additional - * volume location information for each replica. - */ - public BlockStorageLocation[] getBlockStorageLocations( - List<BlockLocation> blockLocations) throws IOException, - UnsupportedOperationException, InvalidBlockTokenException { - if (!getConf().isHdfsBlocksMetadataEnabled()) { - throw new UnsupportedOperationException("Datanode-side support for " + - "getVolumeBlockLocations() must also be enabled in the client " + - "configuration."); - } - // Downcast blockLocations and fetch out required LocatedBlock(s) - List<LocatedBlock> blocks = new ArrayList<LocatedBlock>(); - for (BlockLocation loc : blockLocations) { - if (!(loc instanceof HdfsBlockLocation)) { - throw new ClassCastException("DFSClient#getVolumeBlockLocations " + - "expected to be passed HdfsBlockLocations"); - } - HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc; - blocks.add(hdfsLoc.getLocatedBlock()); - } - - // Re-group the LocatedBlocks to be grouped by datanodes, with the values - // a list of the LocatedBlocks on the datanode. - Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = - new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>(); - for (LocatedBlock b : blocks) { - for (DatanodeInfo info : b.getLocations()) { - if (!datanodeBlocks.containsKey(info)) { - datanodeBlocks.put(info, new ArrayList<LocatedBlock>()); - } - List<LocatedBlock> l = datanodeBlocks.get(info); - l.add(b); - } - } - - // Make RPCs to the datanodes to get volume locations for its replicas - TraceScope scope = - Trace.startSpan("getBlockStorageLocations", traceSampler); - Map<DatanodeInfo, HdfsBlocksMetadata> metadatas; - try { - metadatas = BlockStorageLocationUtil. - queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, - getConf().getFileBlockStorageLocationsNumThreads(), - getConf().getFileBlockStorageLocationsTimeoutMs(), - getConf().isConnectToDnViaHostname()); - if (LOG.isTraceEnabled()) { - LOG.trace("metadata returned: " - + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); - } - } finally { - scope.close(); - } - - // Regroup the returned VolumeId metadata to again be grouped by - // LocatedBlock rather than by datanode - Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil - .associateVolumeIdsWithBlocks(blocks, metadatas); - - // Combine original BlockLocations with new VolumeId information - BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil - .convertToVolumeBlockLocations(blocks, blockVolumeIds); - - return volumeBlockLocations; - } - - /** - * Decrypts a EDEK by consulting the KeyProvider. - */ - private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo - feInfo) throws IOException { - TraceScope scope = Trace.startSpan("decryptEDEK", traceSampler); - try { - KeyProvider provider = getKeyProvider(); - if (provider == null) { - throw new IOException("No KeyProvider is configured, cannot access" + - " an encrypted file"); - } - EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption( - feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(), - feInfo.getEncryptedDataEncryptionKey()); - try { - KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension - .createKeyProviderCryptoExtension(provider); - return cryptoProvider.decryptEncryptedKey(ekv); - } catch (GeneralSecurityException e) { - throw new IOException(e); - } - } finally { - scope.close(); - } - } - - /** - * Obtain the crypto protocol version from the provided FileEncryptionInfo, - * checking to see if this version is supported by. - * - * @param feInfo FileEncryptionInfo - * @return CryptoProtocolVersion from the feInfo - * @throws IOException if the protocol version is unsupported. - */ - private static CryptoProtocolVersion getCryptoProtocolVersion - (FileEncryptionInfo feInfo) throws IOException { - final CryptoProtocolVersion version = feInfo.getCryptoProtocolVersion(); - if (!CryptoProtocolVersion.supports(version)) { - throw new IOException("Client does not support specified " + - "CryptoProtocolVersion " + version.getDescription() + " version " + - "number" + version.getVersion()); - } - return version; - } - - /** - * Obtain a CryptoCodec based on the CipherSuite set in a FileEncryptionInfo - * and the available CryptoCodecs configured in the Configuration. - * - * @param conf Configuration - * @param feInfo FileEncryptionInfo - * @return CryptoCodec - * @throws IOException if no suitable CryptoCodec for the CipherSuite is - * available. - */ - private static CryptoCodec getCryptoCodec(Configuration conf, - FileEncryptionInfo feInfo) throws IOException { - final CipherSuite suite = feInfo.getCipherSuite(); - if (suite.equals(CipherSuite.UNKNOWN)) { - throw new IOException("NameNode specified unknown CipherSuite with ID " - + suite.getUnknownValue() + ", cannot instantiate CryptoCodec."); - } - final CryptoCodec codec = CryptoCodec.getInstance(conf, suite); - if (codec == null) { - throw new UnknownCipherSuiteException( - "No configuration found for the cipher suite " - + suite.getConfigSuffix() + " prefixed with " - + HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX - + ". Please see the example configuration " - + "hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE " - + "at core-default.xml for details."); - } - return codec; - } - - /** - * Wraps the stream in a CryptoInputStream if the underlying file is - * encrypted. - */ - public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis) - throws IOException { - final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo(); - if (feInfo != null) { - // File is encrypted, wrap the stream in a crypto stream. - // Currently only one version, so no special logic based on the version # - getCryptoProtocolVersion(feInfo); - final CryptoCodec codec = getCryptoCodec(conf, feInfo); - final KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo); - final CryptoInputStream cryptoIn = - new CryptoInputStream(dfsis, codec, decrypted.getMaterial(), - feInfo.getIV()); - return new HdfsDataInputStream(cryptoIn); - } else { - // No FileEncryptionInfo so no encryption. - return new HdfsDataInputStream(dfsis); - } - } - - /** - * Wraps the stream in a CryptoOutputStream if the underlying file is - * encrypted. - */ - public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos, - FileSystem.Statistics statistics) throws IOException { - return createWrappedOutputStream(dfsos, statistics, 0); - } - - /** - * Wraps the stream in a CryptoOutputStream if the underlying file is - * encrypted. - */ - public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos, - FileSystem.Statistics statistics, long startPos) throws IOException { - final FileEncryptionInfo feInfo = dfsos.getFileEncryptionInfo(); - if (feInfo != null) { - // File is encrypted, wrap the stream in a crypto stream. - // Currently only one version, so no special logic based on the version # - getCryptoProtocolVersion(feInfo); - final CryptoCodec codec = getCryptoCodec(conf, feInfo); - KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo); - final CryptoOutputStream cryptoOut = - new CryptoOutputStream(dfsos, codec, - decrypted.getMaterial(), feInfo.getIV(), startPos); - return new HdfsDataOutputStream(cryptoOut, statistics, startPos); - } else { - // No FileEncryptionInfo present so no encryption. - return new HdfsDataOutputStream(dfsos, statistics, startPos); - } - } - - public DFSInputStream open(String src) - throws IOException, UnresolvedLinkException { - return open(src, dfsClientConf.getIoBufferSize(), true, null); - } - - /** - * Create an input stream that obtains a nodelist from the - * namenode, and then reads from all the right places. Creates - * inner subclass of InputStream that does the right out-of-band - * work. - * @deprecated Use {@link #open(String, int, boolean)} instead. - */ - @Deprecated - public DFSInputStream open(String src, int buffersize, boolean verifyChecksum, - FileSystem.Statistics stats) - throws IOException, UnresolvedLinkException { - return open(src, buffersize, verifyChecksum); - } - - - /** - * Create an input stream that obtains a nodelist from the - * namenode, and then reads from all the right places. Creates - * inner subclass of InputStream that does the right out-of-band - * work. - */ - public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) - throws IOException, UnresolvedLinkException { - checkOpen(); - // Get block info from namenode - TraceScope scope = getPathTraceScope("newDFSInputStream", src); - try { - return new DFSInputStream(this, src, verifyChecksum, null); - } finally { - scope.close(); - } - } - - /** - * Get the namenode associated with this DFSClient object - * @return the namenode associated with this DFSClient object - */ - public ClientProtocol getNamenode() { - return namenode; - } - - /** - * Call {@link #create(String, boolean, short, long, Progressable)} with - * default <code>replication</code> and <code>blockSize<code> and null <code> - * progress</code>. - */ - public OutputStream create(String src, boolean overwrite) - throws IOException { - return create(src, overwrite, dfsClientConf.getDefaultReplication(), - dfsClientConf.getDefaultBlockSize(), null); - } - - /** - * Call {@link #create(String, boolean, short, long, Progressable)} with - * default <code>replication</code> and <code>blockSize<code>. - */ - public OutputStream create(String src, - boolean overwrite, - Progressable progress) throws IOException { - return create(src, overwrite, dfsClientConf.getDefaultReplication(), - dfsClientConf.getDefaultBlockSize(), progress); - } - - /** - * Call {@link #create(String, boolean, short, long, Progressable)} with - * null <code>progress</code>. - */ - public OutputStream create(String src, - boolean overwrite, - short replication, - long blockSize) throws IOException { - return create(src, overwrite, replication, blockSize, null); - } - - /** - * Call {@link #create(String, boolean, short, long, Progressable, int)} - * with default bufferSize. - */ - public OutputStream create(String src, boolean overwrite, short replication, - long blockSize, Progressable progress) throws IOException { - return create(src, overwrite, replication, blockSize, progress, - dfsClientConf.getIoBufferSize()); - } - - /** - * Call {@link #create(String, FsPermission, EnumSet, short, long, - * Progressable, int, ChecksumOpt)} with default <code>permission</code> - * {@link FsPermission#getFileDefault()}. - * - * @param src File name - * @param overwrite overwrite an existing file if true - * @param replication replication factor for the file - * @param blockSize maximum block size - * @param progress interface for reporting client progress - * @param buffersize underlying buffersize - * - * @return output stream - */ - public OutputStream create(String src, - boolean overwrite, - short replication, - long blockSize, - Progressable progress, - int buffersize) - throws IOException { - return create(src, FsPermission.getFileDefault(), - overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) - : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress, - buffersize, null); - } - - /** - * Call {@link #create(String, FsPermission, EnumSet, boolean, short, - * long, Progressable, int, ChecksumOpt)} with <code>createParent</code> - * set to true. - */ - public DFSOutputStream create(String src, - FsPermission permission, - EnumSet<CreateFlag> flag, - short replication, - long blockSize, - Progressable progress, - int buffersize, - ChecksumOpt checksumOpt) - throws IOException { - return create(src, permission, flag, true, - replication, blockSize, progress, buffersize, checksumOpt, null); - } - - /** - * Create a new dfs file with the specified block replication - * with write-progress reporting and return an output stream for writing - * into the file. - * - * @param src File name - * @param permission The permission of the directory being created. - * If null, use default permission {@link FsPermission#getFileDefault()} - * @param flag indicates create a new file or create/overwrite an - * existing file or append to an existing file - * @param createParent create missing parent directory if true - * @param replication block replication - * @param blockSize maximum block size - * @param progress interface for reporting client progress - * @param buffersize underlying buffer size - * @param checksumOpt checksum options - * - * @return output stream - * - * @see ClientProtocol#create for detailed description of exceptions thrown - */ - public DFSOutputStream create(String src, - FsPermission permission, - EnumSet<CreateFlag> flag, - boolean createParent, - short replication, - long blockSize, - Progressable progress, - int buffersize, - ChecksumOpt checksumOpt) throws IOException { - return create(src, permission, flag, createParent, replication, blockSize, - progress, buffersize, checksumOpt, null); - } - - private FsPermission applyUMask(FsPermission permission) { - if (permission == null) { - permission = FsPermission.getFileDefault(); - } - return permission.applyUMask(dfsClientConf.getUMask()); - } - - /** - * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long, - * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is - * a hint to where the namenode should place the file blocks. - * The favored nodes hint is not persisted in HDFS. Hence it may be honored - * at the creation time only. HDFS could move the blocks during balancing or - * replication, to move the blocks from favored nodes. A value of null means - * no favored nodes for this create - */ - public DFSOutputStream create(String src, - FsPermission permission, - EnumSet<CreateFlag> flag, - boolean createParent, - short replication, - long blockSize, - Progressable progress, - int buffersize, - ChecksumOpt checksumOpt, - InetSocketAddress[] favoredNodes) throws IOException { - checkOpen(); - final FsPermission masked = applyUMask(permission); - if(LOG.isDebugEnabled()) { - LOG.debug(src + ": masked=" + masked); - } - final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, - src, masked, flag, createParent, replication, blockSize, progress, - buffersize, dfsClientConf.createChecksum(checksumOpt), - getFavoredNodesStr(favoredNodes)); - beginFileLease(result.getFileId(), result); - return result; - } - - private String[] getFavoredNodesStr(InetSocketAddress[] favoredNodes) { - String[] favoredNodeStrs = null; - if (favoredNodes != null) { - favoredNodeStrs = new String[favoredNodes.length]; - for (int i = 0; i < favoredNodes.length; i++) { - favoredNodeStrs[i] = - favoredNodes[i].getHostName() + ":" - + favoredNodes[i].getPort(); - } - } - return favoredNodeStrs; - } - - /** - * Append to an existing file if {@link CreateFlag#APPEND} is present - */ - private DFSOutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag, - int buffersize, Progressable progress) throws IOException { - if (flag.contains(CreateFlag.APPEND)) { - HdfsFileStatus stat = getFileInfo(src); - if (stat == null) { // No file to append to - // New file needs to be created if create option is present - if (!flag.contains(CreateFlag.CREATE)) { - throw new FileNotFoundException("failed to append to non-existent file " - + src + " on client " + clientName); - } - return null; - } - return callAppend(src, buffersize, flag, progress, null); - } - return null; - } - - /** - * Same as {{@link #create(String, FsPermission, EnumSet, short, long, - * Progressable, int, ChecksumOpt)} except that the permission - * is absolute (ie has already been masked with umask. - */ - public DFSOutputStream primitiveCreate(String src, - FsPermission absPermission, - EnumSet<CreateFlag> flag, - boolean createParent, - short replication, - long blockSize, - Progressable progress, - int buffersize, - ChecksumOpt checksumOpt) - throws IOException, UnresolvedLinkException { - checkOpen(); - CreateFlag.validate(flag); - DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress); - if (result == null) { - DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt); - result = DFSOutputStream.newStreamForCreate(this, src, absPermission, - flag, createParent, replication, blockSize, progress, buffersize, - checksum, null); - } - beginFileLease(result.getFileId(), result); - return result; - } - - /** - * Creates a symbolic link. - * - * @see ClientProtocol#createSymlink(String, String,FsPermission, boolean) - */ - public void createSymlink(String target, String link, boolean createParent) - throws IOException { - TraceScope scope = getPathTraceScope("createSymlink", target); - try { - final FsPermission dirPerm = applyUMask(null); - namenode.createSymlink(target, link, dirPerm, createParent); - } catch (RemoteException re) { - throw re.unwrapRemoteException(AccessControlException.class, - FileAlreadyExistsException.class, - FileNotFoundException.class, - ParentNotDirectoryException.class, - NSQuotaExceededException.class, - DSQuotaExceededException.class, - QuotaByStorageTypeExceededException.class, - UnresolvedPathException.class, - SnapshotAccessControlException.class); - } finally { - scope.close(); - } - } - - /** - * Resolve the *first* symlink, if any, in the path. - * - * @see ClientProtocol#getLinkTarget(String) - */ - public String getLinkTarget(String path) throws IOException { - checkOpen(); - TraceScope scope = getPathTraceScope("getLinkTarget", path); - try { - return namenode.getLinkTarget(path); - } catch (RemoteException re) { - throw re.unwrapRemoteException(AccessControlException.class, - FileNotFoundException.class); - } finally { - scope.close(); - } - } - - /** Method to get stream returned by append call */ - private DFSOutputStream callAppend(String src, int buffersize, - EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes) - throws IOException { - CreateFlag.validateForAppend(flag); - try { - LastBlockWithStatus blkWithStatus = namenode.append(src, clientName, - new EnumSetWritable<>(flag, CreateFlag.class)); - return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize, - progress, blkWithStatus.getLastBlock(), - blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(null), - favoredNodes); - } catch(RemoteException re) { - throw re.unwrapRemoteException(AccessControlException.class, - FileNotFoundException.class, - SafeModeException.class, - DSQuotaExceededException.class, - QuotaByStorageTypeExceededException.class, - UnsupportedOperationException.class, - UnresolvedPathException.class, - SnapshotAccessControlException.class); - } - } - - /** - * Append to an existing HDFS file. - * - * @param src file name - * @param buffersize buffer size - * @param flag indicates whether to append data to a new block instead of - * the last block - * @param progress for reporting write-progress; null is acceptable. - * @param statistics file system statistics; null is acceptable. - * @return an output stream for writing into the file - * - * @see ClientProtocol#append(String, String, EnumSetWritable) - */ - public HdfsDataOutputStream append(final String src, final int buffersize, - EnumSet<CreateFlag> flag, final Progressable progress, - final FileSystem.Statistics statistics) throws IOException { - final DFSOutputStream out = append(src, buffersize, flag, null, progress); - return createWrappedOutputStream(out, statistics, out.getInitialLen()); - } - - /** - * Append to an existing HDFS file. - * - * @param src file name - * @param buffersize buffer size - * @param flag indicates whether to append data to a new block instead of the - * last block - * @param progress for reporting write-progress; null is acceptable. - * @param statistics file system statistics; null is acceptable. - * @param favoredNodes FavoredNodes for new blocks - * @return an output stream for writing into the file - * @see ClientProtocol#append(String, String, EnumSetWritable) - */ - public HdfsDataOutputStream append(final String src, final int buffersize, - EnumSet<CreateFlag> flag, final Progressable progress, - final FileSystem.Statistics statistics, - final InetSocketAddress[] favoredNodes) throws IOException { - final DFSOutputStream out = append(src, buffersize, flag, - getFavoredNodesStr(favoredNodes), progress); - return createWrappedOutputStream(out, statistics, out.getInitialLen()); - } - - private DFSOutputStream append(String src, int buffersize, - EnumSet<CreateFlag> flag, String[] favoredNodes, Progressable progress) - throws IOException { - checkOpen(); - final DFSOutputStream result = callAppend(src, buffersize, flag, progress, - favoredNodes); - beginFileLease(result.getFileId(), result); - return result; - } - - /** - * Set replication for an existing file. - * @param src file name - * @param replication replication to set the file to - * - * @see ClientProtocol#setReplication(String, short) - */ - public boolean setReplication(String src, short replication) - throws IOException { - TraceScope scope = getPathTraceScope("setReplication", src); - try { - return namenode.setReplication(src, replication); - } catch(RemoteException re) { - throw re.unwrapRemoteException(AccessControlException.class, - FileNotFoundException.class, - SafeModeException.class, - DSQuotaExceededException.class, - QuotaByStorageTypeExceededException.class, - UnresolvedPathException.class, - SnapshotAccessControlException.class); - } finally { - scope.close(); - } - } - - /** - * Set storage policy for an existing file/directory - * @param src file/directory name - * @param policyName name of the storage policy - */ - public void setStoragePolicy(String src, String policyName) - throws IOException { - TraceScope scope = getPathTraceScope("setStoragePolicy", src); - try { - namenode.setStoragePolicy(src, policyName); - } catch (RemoteException e) { - throw e.unwrapRemoteException(AccessControlException.class, - FileNotFoundException.class, - SafeModeException.class, - NSQuotaExceededException.class, - UnresolvedPathException.class, - SnapshotAccessControlException.class); - } finally { - scope.close(); - } - } - - /** - * @param path file/directory name - * @return Get the storage policy for specified path - */ - public BlockStoragePolicy getStoragePolicy(String path) throws IOException { - checkOpen(); - TraceScope scope = getPathTraceScope("getStoragePolicy", path); - try { - return namenode.getStoragePolicy(path); - } catch (RemoteException e) { - throw e.unwrapRemoteException(AccessControlException.class, - FileNotFoundException.class, - SafeModeException.class, - UnresolvedPathException.class); - } finally { - scope.close(); - } - } - - /** - * @return All the existing storage policies - */ - public BlockStoragePolicy[] getStoragePolicies() throws IOException { - TraceScope scope = Trace.startSpan("getStoragePolicies", traceSampler); - try { - return namenode.getStoragePolicies(); - } finally { - scope.close(); - } - } - - /** - * Rename file or directory. - * @see ClientProtocol#rename(String, String) - * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead. - */ - @Deprecated - public boolean rename(String src, String dst) throws IOException { - checkOpen(); - TraceScope scope = getSrcDstTraceScope("rename", src, dst); - try { - return namenode.rename(src, dst); - } catch(RemoteException re) { - throw re.unwrapRemoteException(AccessControlException.class, - NSQuotaExceededException.class, - DSQuotaExceededException.class, - QuotaByStorageTypeExceededException.class, - UnresolvedPathException.class, - SnapshotAccessControlException.class); - } finally { - scope.close(); - } - } - - /** - * Move blocks from src to trg and delete src - * See {@link ClientProtocol#concat}. - */ - public void concat(String trg, String [] srcs) throws IOException { - checkOpen(); - TraceScope scope = Trace.startSpan("concat", traceSampler); - try { - namenode.concat(trg, srcs); - } catch(RemoteException re) { - throw re.unwrapRemoteException(AccessControlException.class, - UnresolvedPathException.class, - SnapshotAccessControlException.class); - } finally { - scope.close(); - } - } - /** - * Rename file or directory. - * @see ClientProtocol#rename2(String, String, Options.Rename...) - */ - public void rename(String src, String dst, Options.Rename... options) - throws IOException { - checkOpen(); - TraceScope scope = getSrcDstTraceScope("rename2", src, dst); - try { - namenode.rename2(src, dst, options); - } catch(RemoteException re) { - throw re.unwrapRemoteException(AccessControlException.class, - DSQuotaExceededException.class, - QuotaByStorageTypeExceededException.class, - FileAlreadyExistsException.class, - FileNotFoundException.class, - ParentNotDirectoryException.class, - SafeModeException.class, - NSQuotaExceededException.class, - UnresolvedPathException.class, - SnapshotAccessControlException.class); - } finally { - scope.close(); - } - } - - /** - * Truncate a file to an indicated size - * See {@link ClientProtocol#truncate}. - */ - public boolean truncate(String src, long newLength) throws IOException { - checkOpen(); - if (newLength < 0) { - throw new HadoopIllegalArgumentException( - "Cannot truncate to a negative file size: " + newLength + "."); - } - try { - return namenode.truncate(src, newLength, clientName); - } catch (RemoteException re) { - throw re.unwrapRemoteException(AccessControlException.class, - UnresolvedPathException.class); - } - } - - /** - * Delete file or directory. - * See {@link ClientProtocol#delete(String, boolean)}. - */ - @Deprecated - public boolean delete(String src) throws IOException { - checkOpen(); - return delete(src, true); - } - - /** - * delete file or directory. - * delete contents of the directory if non empty and recursive - * set to true - * - * @see ClientProtocol#delete(String, boolean) - */ - public boolean delete(String src, boolean recursive) throws IOException { - checkOpen(); - TraceScope scope = getPathTraceScope("delete", src); - try { - return namenode.delete(src, recursive); - } catch(RemoteException re) { - throw re.unwrapRemoteException(AccessControlException.class, - FileNotFoundException.class, - SafeModeException.class, - UnresolvedPathException.class, - SnapshotAccessControlException.class); - } finally { - scope.close(); - } - } - - /** Implemented using getFileInfo(src) - */ - public boolean exists(String src) throws IOException { - checkOpen(); - return getFileInfo(src) != null; - } - - /** - * Get a partial listing of the indicated directory - * No block locations need to be fetched - */ - public DirectoryListing listPaths(String src, byte[] startAfter) - throws IOException { - return listPaths(src, startAfter, false); - } - - /** - * Get a partial listing of the indicated directory - * - * Recommend to use HdfsFileStatus.EMPTY_NAME as startAfter - * if the application wants to fetch a listing starting from - * the first entry in the directory - * - * @see ClientProtocol#getListing(String, byte[], boolean) - */ - public DirectoryListing listPaths(String src, byte[] startAfter, - boolean needLocation) throws IOException { - checkOpen(); - TraceScope scope = getPathTraceScope("listPaths", src); - try { - return namenode.getListing(src, startAfter, needLocation); - } catch(RemoteException re) { - throw re.unwrapRemoteException(AccessControlException.class, - FileNotFoundException.class, - UnresolvedPathException.class); - } finally { - scope.close(); - } - } - - /** - * Get the file info for a specific file or directory. - * @param src The string representation of the path to the file - * @return object containing information regarding the file - * or null if file not found - * - * @see ClientProtocol#getFileInfo(String) for description of exceptions - */ - public HdfsFileStatus getFileInfo(String src) throws IOException { - checkOpen(); - TraceScope scope = getPathTraceScope("getFileInfo", src); - try { - return namenode.getFileInfo(src); - } catch(RemoteException re) { - throw re.unwrapRemoteException(AccessControlException.class, - FileNotFoundException.class, - UnresolvedPathException.class); - } finally { - scope.close(); - } - } - - /** - * Close status of a file - * @return true if file is already closed - */ - public boolean isFileClosed(String src) throws IOException{ - checkOpen(); - TraceScope scope = getPathTraceScope("isFileClosed", src); - try { - return namenode.isFileClosed(src); - } catch(RemoteException re) { - throw re.unwrapRemoteException(AccessControlException.class, - FileNotFoundException.class, - UnresolvedPathException.class); - } finally { - scope.close(); - } - } - - /** - * Get the file info for a specific file or directory. If src - * refers to a symlink then the FileStatus of the link is returned. - * @param src path to a file or directory. - * - * For description of exceptions thrown - * @see ClientProtocol#getFileLinkInfo(String) - */ - public HdfsFileStatus getFileLinkInfo(String src) throws IOException { - checkOpen(); - TraceScope scope = getPathTraceScope("getFileLinkInfo", src); - try { - return namenode.getFileLinkInfo(src); - } catch(RemoteException re) { - throw re.unwrapRemoteException(AccessControlException.class, - UnresolvedPathException.class); - } finally { - scope.close(); - } - } - - @InterfaceAudience.Private - public void clearDataEncryptionKey() { - LOG.debug("Clearing encryption key"); - synchronized (this) { - encryptionKey = null; - } - } - - /** - * @return true if data sent between this client and DNs should be encrypted, - * false otherwise. - * @throws IOException in the event of error communicating with the NN - */ - boolean shouldEncryptData() throws IOException { - FsServerDefaults d = getServerDefaults(); - return d == null ? false : d.getEncryptDataTransfer(); - } - - @Override - public DataEncryptionKey newDataEncryptionKey() throws IOException { - if (shouldEncryptData()) { - synchronized (this) { - if (encryptionKey == null || - encryptionKey.expiryDate < Time.now()) { - LOG.debug("Getting new encryption token from NN"); - encryptionKey = namenode.getDataEncryptionKey(); - } - return encryptionKey; - } - } else { - return null; - } - } - - /** - * Get the checksum of the whole file of a range of the file. Note that the - * range always starts from the beginning of the file. - * @param src The file path - * @param length the length of the range, i.e., the range is [0, length] - * @return The checksum - * @see DistributedFileSystem#getFileChecksum(Path) - */ - public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) - throws IOException { - checkOpen(); - Preconditions.checkArgument(length >= 0); - //get block locations for the file range - LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, - length); - if (null == blockLocations) { - throw new FileNotFoundException("File does not exist: " + src); - } - if (blockLocations.isUnderConstruction()) { - throw new IOException("Fail to get checksum, since file " + src - + " is under construction."); - } - List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks(); - final DataOutputBuffer md5out = new DataOutputBuffer(); - int bytesPerCRC = -1; - DataChecksum.Type crcType = DataChecksum.Type.DEFAULT; - long crcPerBlock = 0; - boolean refetchBlocks = false; - int lastRetriedIndex = -1; - - // get block checksum for each block - long remaining = length; - if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) { - remaining = Math.min(length, blockLocations.getFileLength()); - } - for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) { - if (refetchBlocks) { // refetch to get fresh tokens - blockLocations = callGetBlockLocations(namenode, src, 0, length); - if (null == blockLocations) { - throw new FileNotFoundException("File does not exist: " + src); - } - if (blockLocations.isUnderConstruction()) { - throw new IOException("Fail to get checksum, since file " + src - + " is under construction."); - } - locatedblocks = blockLocations.getLocatedBlocks(); - refetchBlocks = false; - } - LocatedBlock lb = locatedblocks.get(i); - final ExtendedBlock block = lb.getBlock(); - if (remaining < block.getNumBytes()) { - block.setNumBytes(remaining); - } - remaining -= block.getNumBytes(); - final DatanodeInfo[] datanodes = lb.getLocations(); - - //try each datanode location of the block - final int timeout = 3000*datanodes.length + dfsClientConf.getSocketTimeout(); - boolean done = false; - for(int j = 0; !done && j < datanodes.length; j++) { - DataOutputStream out = null; - DataInputStream in = null; - - try { - //connect to a datanode - IOStreamPair pair = connectToDN(datanodes[j], timeout, lb); - out = new DataOutputStream(new BufferedOutputStream(pair.out, - smallBufferSize)); - in = new DataInputStream(pair.in); - - if (LOG.isDebugEnabled()) { - LOG.debug("write to " + datanodes[j] + ": " - + Op.BLOCK_CHECKSUM + ", block=" + block); - } - // get block MD5 - new Sender(out).blockChecksum(block, lb.getBlockToken()); - - final BlockOpResponseProto reply = - BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); - - String logInfo = "for block " + block + " from datanode " + datanodes[j]; - DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); - - OpBlockChecksumResponseProto checksumData = - reply.getChecksumResponse(); - - //read byte-per-checksum - final int bpc = checksumData.getBytesPerCrc(); - if (i == 0) { //first block - bytesPerCRC = bpc; - } - else if (bpc != bytesPerCRC) { - throw new IOException("Byte-per-checksum not matched: bpc=" + bpc - + " but bytesPerCRC=" + bytesPerCRC); - } - - //read crc-per-block - final long cpb = checksumData.getCrcPerBlock(); - if (locatedblocks.size() > 1 && i == 0) { - crcPerBlock = cpb; - } - - //read md5 - final MD5Hash md5 = new MD5Hash( - checksumData.getMd5().toByteArray()); - md5.write(md5out); - - // read crc-type - final DataChecksum.Type ct; - if (checksumData.hasCrcType()) { - ct = PBHelperClient.convert(checksumData - .getCrcType()); - } else { - LOG.debug("Retrieving checksum from an earlier-version DataNode: " + - "inferring checksum by reading first byte"); - ct = inferChecksumTypeByReading(lb, datanodes[j]); - } - - if (i == 0) { // first block - crcType = ct; - } else if (crcType != DataChecksum.Type.MIXED - && crcType != ct) { - // if crc types are mixed in a file - crcType = DataChecksum.Type.MIXED; - } - - done = true; - - if (LOG.isDebugEnabled()) { - if (i == 0) { - LOG.debug("set bytesPerCRC=" + bytesPerCRC - + ", crcPerBlock=" + crcPerBlock); - } - LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5); - } - } catch (InvalidBlockTokenException ibte) { - if (i > lastRetriedIndex) { - if (LOG.isDebugEnabled()) { - LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM " - + "for file " + src + " for block " + block - + " from datanode " + datanodes[j] - + ". Will retry the block once."); - } - lastRetriedIndex = i; - done = true; // actually it's not done; but we'll retry - i--; // repeat at i-th block - refetchBlocks = true; - break; - } - } catch (IOException ie) { - LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie); - } finally { - IOUtils.closeStream(in); - IOUtils.closeStream(out); - } - } - - if (!done) { - throw new IOException("Fail to get block MD5 for " + block); - } - } - - //compute file MD5 - final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData()); - switch (crcType) { - case CRC32: - return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC, - crcPerBlock, fileMD5); - case CRC32C: - return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC, - crcPerBlock, fileMD5); - default: - // If there is no block allocated for the file, - // return one with the magic entry that matches what previous - // hdfs versions return. - if (locatedblocks.size() == 0) { - return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5); - } - - // we should never get here since the validity was checked - // when getCrcType() was called above. - return null; - } - } - - /** - * Connect to the given datanode's datantrasfer port, and return - * the resulting IOStreamPair. This includes encryption wrapping, etc. - */ - private IOStreamPair connectToDN(DatanodeInfo dn, int timeout, - LocatedBlock lb) throws IOException { - boolean success = false; - Socket sock = null; - try { - sock = socketFactory.createSocket(); - String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname()); - if (LOG.isDebugEnabled()) { - LOG.debug("Connecting to datanode " + dnAddr); - } - NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout); - sock.setSoTimeout(timeout); - - OutputStream unbufOut = NetUtils.getOutputStream(sock); - InputStream unbufIn = NetUtils.getInputStream(sock); - IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this, - lb.getBlockToken(), dn); - success = true; - return ret; - } finally { - if (!success) { - IOUtils.closeSocket(sock); - } - } - } - - /** - * Infer the checksum type for a replica by sending an OP_READ_BLOCK - * for the first byte of that replica. This is used for compatibility - * with older HDFS versions which did not include the checksum type in - * OpBlockChecksumResponseProto. - * - * @param lb the located block - * @param dn the connected datanode - * @return the inferred checksum type - * @throws IOException if an error occurs - */ - private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) - throws IOException { - IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb); - - try { - DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, - smallBufferSize)); - DataInputStream in = new DataInputStream(pair.in); - - new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, - 0, 1, true, CachingStrategy.newDefaultStrategy()); - final BlockOpResponseProto reply = - BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); - String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn; - DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); - - return PBHelperClient.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); - } finally { - IOUtils.cleanup(null, pair.in, pair.out); - } - } - - /** - * Set permissions to a file or directory. - * @param src path name. - * @param permission permission to set to - * - * @see ClientProtocol#setPermission(String, FsPermission) - */ - public void setPermission(String src, FsPermission permission) - throws IOException { - checkOpen(); - TraceScope scope = getPathTraceScope("setPermission", src); - try { - namenode.setPermission(src, permission); - } catch(RemoteException re) { - throw re.unwrapRemoteException(AccessControlException.class, - FileNotFoundException.class, - SafeModeException.class, - UnresolvedPathException.class, - SnapshotAccessControlException.class); - } finally { - scope.close(); - } - } - - /** - * Set file or directory owner. - * @param src path name. - * @param username user id. - * @param groupname user group. - * - * @see ClientProtocol#setOwner(String, String, String) - */ - public void setOwner(String src, String username, String groupname) - throws IOException { - checkOpen(); - TraceScope scope = getPathTraceScope("setOwner", src); - try { - namenode.setOwner(src, username, groupname); - } catch(RemoteException re) { - throw re.unwrapRemoteException(AccessControlException.class, - FileNotFoundException.class, - SafeModeException.class, - UnresolvedPathException.class, - SnapshotAccessControlException.class); - } finally { - scope.close(); - } - } - - private long[] callGetStats() throws IOException { - checkOpen(); - TraceScope scope = Trace.startSpan("getStats", traceSampler); - try { - return namenode.getStats(); - } finally { - scope.close(); - } - } - - /** - * @see ClientProtocol#getStats() - */ - public FsStatus getDiskStatus() throws IOException { - long rawNums[] = callGetStats(); - return new FsStatus(rawNums[0], rawNums[1], rawNums[2]); - } - - /** - * Returns count of blocks with no good replicas left. Normally should be - * zero. - * @throws IOException - */ - public long getMissingBlocksCount() throws IOException { - return callGetStats()[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX]; - } - - /** - * Returns count of blocks with replication factor 1 and have - * lost the only replica. - * @throws IOException - */ - public long getMissingReplOneBlocksCount() throws IOException { - return callGetStats()[ClientProtocol. - GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX]; - } - - /** - * Returns count of blocks with one of more replica missing. - * @throws IOException - */ - public long getUnderReplicatedBlocksCount() throws IOException { - return callGetStats()[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX]; - } - - /** - * Returns count of blocks with at least one replica marked corrupt. - * @throws IOException - */ - public long getCorruptBlocksCount() throws IOException { - return callGetStats()[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX]; - } - - /** - * @return a list in which each entry describes a corrupt file/block - * @throws IOException - */ - public CorruptFileBlocks listCorruptFileBlocks(String path, - String cookie) - throws IOException { - checkOpen(); - TraceScope scope = getPathTraceScope("listCorruptFileBlocks", path); - try { - return namenode.listCorruptFileBlocks(path, cookie); - } finally { - scope.close(); - } - } - - public DatanodeInfo[] datanodeReport(DatanodeReportType type) - throws IOException { - checkOpen(); - TraceScope scope = Trace.startSpan("datanodeReport", traceSampler); - try { - return namenode.getDatanodeReport(type); - } finally { - scope.close(); - } - } - - public DatanodeStorageReport[] getDatanodeStorageReport( - DatanodeReportType type) throws IOException { - checkOpen(); - TraceScope scope = - Trace.startSpan("datanodeStorageReport", traceSampler); - try { - return namenode.getDatanodeStorageReport(type); - } finally { - scope.close(); - } - } - - /** - * Enter, leave or get safe mode. - * - * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction,boolean) - */ - public boolean setSafeMode(SafeModeAction action) throws IOException { - return setSafeMode(action, false); - } - - /** - * Enter, leave or get safe mode. - * - * @param action - * One of SafeModeAction.GET, SafeModeAction.ENTER and - * SafeModeActiob.LEAVE - * @param isChecked - * If true, then check only active namenode's safemode status, else - * check first namenode's status. - * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean) - */ - public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException{ - TraceScope scope = - Trace.startSpan("setSafeMode", traceSampler); - try { - return namenode.setSafeMode(action, isChecked); - } finally { - scope.close(); - } - } - - /** - * Create one snapshot. - * - * @param snapshotRoot The directory where the snapshot is to be taken - * @param snapshotName Name of the snapshot - * @return the snapshot path. - * @see ClientProtocol#createSnapshot(String, String) - */ - public String createSnapshot(String snapshotRoot, String snapshotName) - throws IOException { - checkOpen(); - TraceScope scope = Trace.startSpan("createSnapshot", traceSampler); - try { - return namenode.createSnapshot(snapshotRoot, snapshotName); - } catch(RemoteException re) { - throw re.unwrapRemoteException(); - } finally { - scope.close(); - } - } - - /** - * Delete a snapshot of a snapshottable directory. - * - * @param snapshotRoot The snapshottable directory that the - * to-be-deleted snapshot belongs to - * @param snapshotName The name of the to-be-deleted snapshot - * @throws IOException - * @see ClientProtocol#deleteSnapshot(String, String) - */ - public void deleteSnapshot(String snapshotRoot, String snapshotName) - throws IOException { - checkOpen(); - TraceScope scope = Trace.startSpan("deleteSnapshot", traceSampler); - try { - namenode.deleteSnapshot(snapshotRoot, snapshotName); - } catch(RemoteException re) { - throw re.unwrapRemoteException(); - } finally {
<TRUNCATED>
