Repository: hadoop Updated Branches: refs/heads/branch-2.8 168d8e0c0 -> fc66e76a4
HDFS-10480. Add an admin command to list currently open files. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fc66e76a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fc66e76a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fc66e76a Branch: refs/heads/branch-2.8 Commit: fc66e76a4a22781c13a07d7efeab1faad7e4c79d Parents: 168d8e0 Author: Manoj Govindassamy <[email protected]> Authored: Tue Aug 1 17:37:45 2017 -0700 Committer: Manoj Govindassamy <[email protected]> Committed: Tue Aug 1 17:37:45 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hdfs/DFSClient.java | 12 + .../hadoop/hdfs/DistributedFileSystem.java | 15 ++ .../apache/hadoop/hdfs/client/HdfsAdmin.java | 15 ++ .../hadoop/hdfs/protocol/ClientProtocol.java | 12 + .../hadoop/hdfs/protocol/OpenFileEntry.java | 58 +++++ .../hadoop/hdfs/protocol/OpenFilesIterator.java | 59 +++++ .../ClientNamenodeProtocolTranslatorPB.java | 23 ++ .../hadoop/hdfs/protocolPB/PBHelperClient.java | 17 ++ .../src/main/proto/ClientNamenodeProtocol.proto | 18 ++ .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + ...tNamenodeProtocolServerSideTranslatorPB.java | 20 ++ .../hdfs/server/namenode/FSNamesystem.java | 45 ++++ .../hdfs/server/namenode/LeaseManager.java | 52 ++++- .../hdfs/server/namenode/NameNodeRpcServer.java | 8 + .../org/apache/hadoop/hdfs/tools/DFSAdmin.java | 60 +++++ .../src/main/resources/hdfs-default.xml | 10 + .../src/site/markdown/HDFSCommands.md | 2 + .../org/apache/hadoop/hdfs/DFSTestUtil.java | 40 +++- .../org/apache/hadoop/hdfs/TestHdfsAdmin.java | 59 +++++ .../hdfs/server/namenode/TestListOpenFiles.java | 234 +++++++++++++++++++ .../apache/hadoop/hdfs/tools/TestDFSAdmin.java | 75 ++++++ 21 files changed, 834 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 4f211a8..1e999d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -134,6 +134,8 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator; import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; @@ -3085,4 +3087,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, Tracer getTracer() { return tracer; } + + /** + * Get a remote iterator to the open files list managed by NameNode. + * + * @throws IOException + */ + public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException { + checkOpen(); + return new OpenFilesIterator(namenode, tracer); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 51bc727..b464cb8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; @@ -2556,4 +2557,18 @@ public class DistributedFileSystem extends FileSystem { DFSOpsCountStatistics getDFSOpsCountStatistics() { return storageStatistics; } + + /** + * Returns a RemoteIterator which can be used to list all open files + * currently managed by the NameNode. For large numbers of open files, + * iterator will fetch the list in batches of configured size. + * <p/> + * Since the list is fetched in batches, it does not represent a + * consistent snapshot of the all open files. + * <p/> + * This method can only be called by HDFS superusers. + */ + public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException { + return dfs.listOpenFiles(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java index 7ea13bc..189c7c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.security.AccessControlException; /** @@ -508,4 +509,18 @@ public class HdfsAdmin { dfs.mkdir(trashPath, TRASH_PERMISSION); dfs.setPermission(trashPath, TRASH_PERMISSION); } + + /** + * Returns a RemoteIterator which can be used to list all open files + * currently managed by the NameNode. For large numbers of open files, + * iterator will fetch the list in batches of configured size. + * <p/> + * Since the list is fetched in batches, it does not represent a + * consistent snapshot of the all open files. + * <p/> + * This method can only be called by HDFS superusers. + */ + public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException { + return dfs.listOpenFiles(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index a0371dc..587a15c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -1514,4 +1514,16 @@ public interface ClientProtocol { */ @Idempotent QuotaUsage getQuotaUsage(String path) throws IOException; + + /** + * List open files in the system in batches. INode id is the cursor and the + * open files returned in a batch will have their INode ids greater than + * the cursor INode id. Open files can only be requested by super user and + * the the list across batches are not atomic. + * + * @param prevId the cursor INode id. + * @throws IOException + */ + @Idempotent + BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFileEntry.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFileEntry.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFileEntry.java new file mode 100644 index 0000000..14e97d3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFileEntry.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * An open file entry for use by DFSAdmin commands. + */ [email protected] [email protected] +public class OpenFileEntry { + private final long id; + private final String filePath; + private final String clientName; + private final String clientMachine; + + public OpenFileEntry(long id, String filePath, + String clientName, String clientMachine) { + this.id = id; + this.filePath = filePath; + this.clientName = clientName; + this.clientMachine = clientMachine; + } + + public long getId() { + return id; + } + + public String getFilePath() { + return filePath; + } + + public String getClientMachine() { + return clientMachine; + } + + public String getClientName() { + return clientName; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java new file mode 100644 index 0000000..c24e585 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.BatchedRemoteIterator; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; + +/** + * OpenFilesIterator is a remote iterator that iterates over the open files list + * managed by the NameNode. Since the list is retrieved in batches, it does not + * represent a consistent view of all open files. + */ [email protected] [email protected] +public class OpenFilesIterator extends + BatchedRemoteIterator<Long, OpenFileEntry> { + private final ClientProtocol namenode; + private final Tracer tracer; + + public OpenFilesIterator(ClientProtocol namenode, Tracer tracer) { + super(HdfsConstants.GRANDFATHER_INODE_ID); + this.namenode = namenode; + this.tracer = tracer; + } + + @Override + public BatchedEntries<OpenFileEntry> makeRequest(Long prevId) + throws IOException { + try (TraceScope ignored = tracer.newScope("listOpenFiles")) { + return namenode.listOpenFiles(prevId); + } + } + + @Override + public Long elementToPrevKey(OpenFileEntry entry) { + return entry.getId(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 011c804..67b25d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; @@ -131,10 +132,13 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCa import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto; @@ -1583,4 +1587,23 @@ public class ClientNamenodeProtocolTranslatorPB implements throw ProtobufHelper.getRemoteException(e); } } + + @Override + public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) + throws IOException { + ListOpenFilesRequestProto req = + ListOpenFilesRequestProto.newBuilder().setId(prevId).build(); + try { + ListOpenFilesResponseProto response = rpcProxy.listOpenFiles(null, req); + List<OpenFileEntry> openFileEntries = + Lists.newArrayListWithCapacity(response.getEntriesCount()); + for (OpenFilesBatchResponseProto p : response.getEntriesList()) { + openFileEntries.add(PBHelperClient.convert(p)); + } + return new BatchedListEntries<>(openFileEntries, response.getHasMore()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index b8d6d37..6550219 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; @@ -109,6 +110,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Datano import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeStorageReportProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto; @@ -1071,6 +1073,21 @@ public class PBHelperClient { proto.getKeyName()); } + public static OpenFilesBatchResponseProto convert(OpenFileEntry + openFileEntry) { + return OpenFilesBatchResponseProto.newBuilder() + .setId(openFileEntry.getId()) + .setPath(openFileEntry.getFilePath()) + .setClientName(openFileEntry.getClientName()) + .setClientMachine(openFileEntry.getClientMachine()) + .build(); + } + + public static OpenFileEntry convert(OpenFilesBatchResponseProto proto) { + return new OpenFileEntry(proto.getId(), proto.getPath(), + proto.getClientName(), proto.getClientMachine()); + } + public static AclStatus convert(GetAclStatusResponseProto e) { AclStatusProto r = e.getResult(); AclStatus.Builder builder = new AclStatus.Builder(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index f19c1c6..205ed72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -743,6 +743,22 @@ message GetEditsFromTxidResponseProto { required EventsListProto eventsList = 1; } +message ListOpenFilesRequestProto { + required int64 id = 1; +} + +message OpenFilesBatchResponseProto { + required int64 id = 1; + required string path = 2; + required string clientName = 3; + required string clientMachine = 4; +} + +message ListOpenFilesResponseProto { + repeated OpenFilesBatchResponseProto entries = 1; + required bool hasMore = 2; +} + service ClientNamenodeProtocol { rpc getBlockLocations(GetBlockLocationsRequestProto) returns(GetBlockLocationsResponseProto); @@ -895,4 +911,6 @@ service ClientNamenodeProtocol { returns(GetEditsFromTxidResponseProto); rpc getQuotaUsage(GetQuotaUsageRequestProto) returns(GetQuotaUsageResponseProto); + rpc listOpenFiles(ListOpenFilesRequestProto) + returns(ListOpenFilesResponseProto); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index af7b5e1..bb28d94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -765,6 +765,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY; public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100; public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses"; + public static final String DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES = + "dfs.namenode.list.openfiles.num.responses"; + public static final int DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES_DEFAULT = + 1000; public static final String DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_KEY = "dfs.namenode.edekcacheloader.interval.ms"; public static final int DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT = 1000; public static final String DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY = "dfs.namenode.edekcacheloader.initial.delay.ms"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 9f7c511..0debaac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; @@ -143,6 +144,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCa import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto; @@ -1565,4 +1568,21 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements throw new ServiceException(e); } } + + @Override + public ListOpenFilesResponseProto listOpenFiles(RpcController controller, + ListOpenFilesRequestProto req) throws ServiceException { + try { + BatchedEntries<OpenFileEntry> entries = server.listOpenFiles(req.getId()); + ListOpenFilesResponseProto.Builder builder = + ListOpenFilesResponseProto.newBuilder(); + builder.setHasMore(entries.hasMore()); + for (int i = 0; i < entries.size(); i++) { + builder.addEntries(PBHelperClient.convert(entries.get(i))); + } + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index ec678b8..385612f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -203,6 +203,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; @@ -411,6 +412,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, /** Maximum time the lock is hold to release lease. */ private final long maxLockHoldToReleaseLeaseMs; + // Batch size for open files response + private final int maxListOpenFilesResponses; + // Scan interval is not configurable. private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS); @@ -880,6 +884,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, inodeAttributeProvider = ReflectionUtils.newInstance(klass, conf); LOG.info("Using INode attribute provider: " + klass.getName()); } + this.maxListOpenFilesResponses = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES_DEFAULT + ); + Preconditions.checkArgument(maxListOpenFilesResponses > 0, + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES + + " must be a positive integer." + ); } catch(IOException e) { LOG.error(getClass().getSimpleName() + " initialization failed.", e); close(); @@ -911,6 +923,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return maxLockHoldToReleaseLeaseMs; } + public int getMaxListOpenFilesResponses() { + return maxListOpenFilesResponses; + } + void lockRetryCache() { if (retryCache != null) { retryCache.lock(); @@ -1688,6 +1704,35 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, blockManager.metaSave(out); } + /** + * List open files in the system in batches. prevId is the cursor INode id and + * the open files returned in a batch will have their INode ids greater than + * this cursor. Open files can only be requested by super user and the the + * list across batches does not represent a consistent view of all open files. + * + * @param prevId the cursor INode id. + * @throws IOException + */ + BatchedListEntries<OpenFileEntry> listOpenFiles(long prevId) + throws IOException { + final String operationName = "listOpenFiles"; + checkSuperuserPrivilege(); + checkOperation(OperationCategory.READ); + readLock(); + BatchedListEntries<OpenFileEntry> batchedListEntries; + try { + checkOperation(OperationCategory.READ); + batchedListEntries = leaseManager.getUnderConstructionFiles(prevId); + } catch (AccessControlException e) { + logAuditEvent(false, operationName, null); + throw e; + } finally { + readUnlock(operationName); + } + logAuditEvent(true, operationName, null); + return batchedListEntries; + } + private String metaSaveAsString() { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java index 5cfa498..e160c9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java @@ -24,17 +24,19 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.PriorityQueue; import java.util.SortedMap; import java.util.TreeMap; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.util.Daemon; @@ -90,7 +92,7 @@ public class LeaseManager { } }); // INodeID -> Lease - private final HashMap<Long, Lease> leasesById = new HashMap<>(); + private final TreeMap<Long, Lease> leasesById = new TreeMap<>(); private Daemon lmthread; private volatile boolean shouldRunMonitor; @@ -151,6 +153,52 @@ public class LeaseManager { Collection<Long> getINodeIdWithLeases() {return leasesById.keySet();} + /** + * Get a batch of under construction files from the currently active leases. + * File INodeID is the cursor used to fetch new batch of results and the + * batch size is configurable using below config param. Since the list is + * fetched in batches, it does not represent a consistent view of all + * open files. + * + * @see org.apache.hadoop.hdfs.DFSConfigKeys#DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES + * @param prevId the INodeID cursor + * @throws IOException + */ + public BatchedListEntries<OpenFileEntry> getUnderConstructionFiles( + final long prevId) throws IOException { + assert fsnamesystem.hasReadLock(); + SortedMap<Long, Lease> remainingLeases; + synchronized (this) { + remainingLeases = leasesById.tailMap(prevId, false); + } + Collection<Long> inodeIds = remainingLeases.keySet(); + final int numResponses = Math.min( + this.fsnamesystem.getMaxListOpenFilesResponses(), inodeIds.size()); + final List<OpenFileEntry> openFileEntries = + Lists.newArrayListWithExpectedSize(numResponses); + + int count = 0; + for (Long inodeId: inodeIds) { + final INodeFile inodeFile = + fsnamesystem.getFSDirectory().getInode(inodeId).asFile(); + if (!inodeFile.isUnderConstruction()) { + LOG.warn("The file " + inodeFile.getFullPathName() + + " is not under construction but has lease."); + continue; + } + openFileEntries.add(new OpenFileEntry( + inodeFile.getId(), inodeFile.getFullPathName(), + inodeFile.getFileUnderConstructionFeature().getClientName(), + inodeFile.getFileUnderConstructionFeature().getClientMachine())); + count++; + if (count >= numResponses) { + break; + } + } + boolean hasMore = (numResponses < remainingLeases.size()); + return new BatchedListEntries<>(openFileEntries, hasMore); + } + /** @return the lease containing src */ public synchronized Lease getLease(INodeFile src) {return leasesById.get(src.getId());} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 9db51b2..ed30b25 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -104,6 +104,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; @@ -1261,6 +1262,13 @@ public class NameNodeRpcServer implements NamenodeProtocols { } @Override // ClientProtocol + public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) + throws IOException { + checkNNStartup(); + return namesystem.listOpenFiles(prevId); + } + + @Override // ClientProtocol public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) throws IOException { checkNNStartup(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index bb14be1..70acd90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -48,7 +48,9 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.FsTracer; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.shell.Command; import org.apache.hadoop.fs.shell.CommandFormat; import org.apache.hadoop.fs.shell.PathData; @@ -71,6 +73,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -449,6 +453,7 @@ public class DFSAdmin extends FsShell { "\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" + "\t[-metasave filename]\n" + "\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]\n" + + "\t[-listOpenFiles]\n" + "\t[-help [cmd]]\n"; /** @@ -848,6 +853,45 @@ public class DFSAdmin extends FsShell { } /** + * Command to list all the open files currently managed by NameNode. + * Usage: hdfs dfsadmin -listOpenFiles + * + * @throws IOException + */ + public int listOpenFiles() throws IOException { + DistributedFileSystem dfs = getDFS(); + Configuration dfsConf = dfs.getConf(); + URI dfsUri = dfs.getUri(); + boolean isHaEnabled = HAUtilClient.isLogicalUri(dfsConf, dfsUri); + + RemoteIterator<OpenFileEntry> openFilesRemoteIterator; + if (isHaEnabled) { + ProxyAndInfo<ClientProtocol> proxy = NameNodeProxies.createNonHAProxy( + dfsConf, HAUtil.getAddressOfActive(getDFS()), ClientProtocol.class, + UserGroupInformation.getCurrentUser(), false); + openFilesRemoteIterator = new OpenFilesIterator(proxy.getProxy(), + FsTracer.get(dfsConf)); + } else { + openFilesRemoteIterator = dfs.listOpenFiles(); + } + printOpenFiles(openFilesRemoteIterator); + return 0; + } + + private void printOpenFiles(RemoteIterator<OpenFileEntry> openFilesIterator) + throws IOException { + System.out.println(String.format("%-20s\t%-20s\t%s", "Client Host", + "Client Name", "Open File Path")); + while (openFilesIterator.hasNext()) { + OpenFileEntry openFileEntry = openFilesIterator.next(); + System.out.println(String.format("%-20s\t%-20s\t%20s", + openFileEntry.getClientMachine(), + openFileEntry.getClientName(), + openFileEntry.getFilePath())); + } + } + + /** * Command to ask the namenode to set the balancer bandwidth for all of the * datanodes. * Usage: hdfs dfsadmin -setBalancerBandwidth bandwidth @@ -1095,6 +1139,10 @@ public class DFSAdmin extends FsShell { + "\tIf 'incremental' is specified, it will be an incremental\n" + "\tblock report; otherwise, it will be a full block report.\n"; + String listOpenFiles = "-listOpenFiles\n" + + "\tList all open files currently managed by the NameNode along\n" + + "\twith client name and client machine accessing them.\n"; + String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" + "\t\tis specified.\n"; @@ -1158,6 +1206,8 @@ public class DFSAdmin extends FsShell { System.out.println(evictWriters); } else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) { System.out.println(getDatanodeInfo); + } else if ("listOpenFiles".equalsIgnoreCase(cmd)) { + System.out.println(listOpenFiles); } else if ("help".equals(cmd)) { System.out.println(help); } else { @@ -1193,6 +1243,7 @@ public class DFSAdmin extends FsShell { System.out.println(evictWriters); System.out.println(getDatanodeInfo); System.out.println(triggerBlockReport); + System.out.println(listOpenFiles); System.out.println(help); System.out.println(); ToolRunner.printGenericCommandUsage(System.out); @@ -1748,6 +1799,8 @@ public class DFSAdmin extends FsShell { } else if ("-triggerBlockReport".equals(cmd)) { System.err.println("Usage: hdfs dfsadmin" + " [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]"); + } else if ("-listOpenFiles".equals(cmd)) { + System.err.println("Usage: hdfs dfsadmin [-listOpenFiles]"); } else { System.err.println("Usage: hdfs dfsadmin"); System.err.println("Note: Administrative commands can only be run as the HDFS superuser."); @@ -1896,6 +1949,11 @@ public class DFSAdmin extends FsShell { printUsage(cmd); return exitCode; } + } else if ("-listOpenFiles".equals(cmd)) { + if (argv.length != 1) { + printUsage(cmd); + return exitCode; + } } // initialize DFSAdmin @@ -1975,6 +2033,8 @@ public class DFSAdmin extends FsShell { exitCode = reconfig(argv, i); } else if ("-triggerBlockReport".equals(cmd)) { exitCode = triggerBlockReport(argv); + } else if ("-listOpenFiles".equals(cmd)) { + exitCode = listOpenFiles(); } else if ("-help".equals(cmd)) { if (i < argv.length) { printHelp(argv[i]); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index a7d4ef5..1a24a40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2683,6 +2683,16 @@ </description> </property> + <property> + <name>dfs.namenode.list.openfiles.num.responses</name> + <value>1000</value> + <description> + When listing open files, the maximum number of open files that will be + returned in a single batch. Fetching the list incrementally in batches + improves namenode performance. + </description> + </property> + <property> <name>dfs.namenode.edekcacheloader.interval.ms</name> <value>1000</value> http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index 25710ac..46c7f41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -352,6 +352,7 @@ Usage: [-getDatanodeInfo <datanode_host:ipc_port>] [-evictWriters <datanode_host:ipc_port>] [-triggerBlockReport [-incremental] <datanode_host:ipc_port>] + [-listOpenFiles] [-help [cmd]] | COMMAND\_OPTION | Description | @@ -387,6 +388,7 @@ Usage: | `-evictWriters` \<datanode\_host:ipc\_port\> | Make the datanode evict all clients that are writing a block. This is useful if decommissioning is hung due to slow writers. | | `-getDatanodeInfo` \<datanode\_host:ipc\_port\> | Get the information about the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. | | `-triggerBlockReport` `[-incremental]` \<datanode\_host:ipc\_port\> | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. | +| `-listOpenFiles` | List all open files currently managed by the NameNode along with client name and client machine accessing them. | | `-help` [cmd] | Displays help for the given command or all commands if none is specified. | Runs a HDFS dfsadmin client. http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 9350738..93fa44c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -62,8 +62,10 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.UUID; @@ -1775,8 +1777,8 @@ public class DFSTestUtil { GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level); } - /** - * Change the length of a block at datanode dnIndex + /** + * Change the length of a block at datanode dnIndex. */ public static boolean changeReplicaLength(MiniDFSCluster cluster, ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException { @@ -2055,4 +2057,38 @@ public class DFSTestUtil { assertFalse("File in trash : " + trashPath, fs.exists(trashPath)); } } + + public static Map<Path, FSDataOutputStream> createOpenFiles(FileSystem fs, + String filePrefix, int numFilesToCreate) throws IOException { + final Map<Path, FSDataOutputStream> filesCreated = new HashMap<>(); + final byte[] buffer = new byte[(int) (1024 * 1.75)]; + final Random rand = new Random(0xFEED0BACL); + for (int i = 0; i < numFilesToCreate; i++) { + Path file = new Path("/" + filePrefix + "-" + i); + FSDataOutputStream stm = fs.create(file, true, 1024, (short) 1, 1024); + rand.nextBytes(buffer); + stm.write(buffer); + filesCreated.put(file, stm); + } + return filesCreated; + } + + public static HashSet<Path> closeOpenFiles( + HashMap<Path, FSDataOutputStream> openFilesMap, + int numFilesToClose) throws IOException { + HashSet<Path> closedFiles = new HashSet<>(); + for (Iterator<Entry<Path, FSDataOutputStream>> it = + openFilesMap.entrySet().iterator(); it.hasNext();) { + Entry<Path, FSDataOutputStream> entry = it.next(); + LOG.info("Closing file: " + entry.getKey()); + entry.getValue().close(); + closedFiles.add(entry.getKey()); + it.remove(); + numFilesToClose--; + if (numFilesToClose == 0) { + break; + } + } + return closedFiles; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java index fe20c68..685ea8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java @@ -18,12 +18,14 @@ package org.apache.hadoop.hdfs; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -31,11 +33,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; import org.apache.hadoop.fs.BlockStoragePolicySpi; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.junit.After; import org.junit.Assert; @@ -49,11 +54,15 @@ public class TestHdfsAdmin { private static final Path TEST_PATH = new Path("/test"); private static final short REPL = 1; private static final int SIZE = 128; + private static final int OPEN_FILES_BATCH_SIZE = 5; private final Configuration conf = new Configuration(); private MiniDFSCluster cluster; @Before public void setUpCluster() throws IOException { + conf.setLong( + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, + OPEN_FILES_BATCH_SIZE); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); cluster.waitActive(); } @@ -205,4 +214,54 @@ public class TestHdfsAdmin { Assert.assertNotNull("should not return null for an encrypted cluster", hdfsAdmin.getKeyProvider()); } + + @Test(timeout = 120000L) + public void testListOpenFiles() throws IOException { + HashSet<Path> closedFileSet = new HashSet<>(); + HashMap<Path, FSDataOutputStream> openFileMap = new HashMap<>(); + FileSystem fs = FileSystem.get(conf); + verifyOpenFiles(closedFileSet, openFileMap); + + int numClosedFiles = OPEN_FILES_BATCH_SIZE * 4; + int numOpenFiles = (OPEN_FILES_BATCH_SIZE * 3) + 1; + for (int i = 0; i < numClosedFiles; i++) { + Path filePath = new Path("/closed-file-" + i); + DFSTestUtil.createFile(fs, filePath, SIZE, REPL, 0); + closedFileSet.add(filePath); + } + verifyOpenFiles(closedFileSet, openFileMap); + + openFileMap.putAll( + DFSTestUtil.createOpenFiles(fs, "open-file-1", numOpenFiles)); + verifyOpenFiles(closedFileSet, openFileMap); + + closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFileMap, + openFileMap.size() / 2)); + verifyOpenFiles(closedFileSet, openFileMap); + + openFileMap.putAll( + DFSTestUtil.createOpenFiles(fs, "open-file-2", 10)); + verifyOpenFiles(closedFileSet, openFileMap); + + while(openFileMap.size() > 0) { + closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFileMap, 1)); + verifyOpenFiles(closedFileSet, openFileMap); + } + } + + private void verifyOpenFiles(HashSet<Path> closedFiles, + HashMap<Path, FSDataOutputStream> openFileMap) throws IOException { + HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf); + HashSet<Path> openFiles = new HashSet<>(openFileMap.keySet()); + RemoteIterator<OpenFileEntry> openFilesRemoteItr = + hdfsAdmin.listOpenFiles(); + while (openFilesRemoteItr.hasNext()) { + String filePath = openFilesRemoteItr.next().getFilePath(); + assertFalse(filePath + " should not be listed under open files!", + closedFiles.contains(filePath)); + assertTrue(filePath + " is not listed under open files!", + openFiles.remove(new Path(filePath))); + } + assertTrue("Not all open files are listed!", openFiles.isEmpty()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java new file mode 100644 index 0000000..b290194 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.tools.DFSAdmin; +import org.apache.hadoop.util.ToolRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Verify open files listing. + */ +public class TestListOpenFiles { + private static final int NUM_DATA_NODES = 3; + private static final int BATCH_SIZE = 5; + private static MiniDFSCluster cluster = null; + private static DistributedFileSystem fs = null; + private static NamenodeProtocols nnRpc = null; + private static final Log LOG = LogFactory.getLog(TestListOpenFiles.class); + + @Before + public void setUp() throws IOException { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + conf.setLong( + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, BATCH_SIZE); + cluster = new MiniDFSCluster.Builder(conf). + numDataNodes(NUM_DATA_NODES).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + nnRpc = cluster.getNameNodeRpc(); + } + + @After + public void tearDown() throws IOException { + if (fs != null) { + fs.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test(timeout = 120000L) + public void testListOpenFilesViaNameNodeRPC() throws Exception { + HashMap<Path, FSDataOutputStream> openFiles = new HashMap<>(); + createFiles(fs, "closed", 10); + verifyOpenFiles(openFiles); + + BatchedEntries<OpenFileEntry> openFileEntryBatchedEntries = + nnRpc.listOpenFiles(0); + assertTrue("Open files list should be empty!", + openFileEntryBatchedEntries.size() == 0); + + openFiles.putAll( + DFSTestUtil.createOpenFiles(fs, "open-1", 1)); + verifyOpenFiles(openFiles); + + openFiles.putAll( + DFSTestUtil.createOpenFiles(fs, "open-2", + (BATCH_SIZE * 2 + BATCH_SIZE / 2))); + verifyOpenFiles(openFiles); + + DFSTestUtil.closeOpenFiles(openFiles, openFiles.size() / 2); + verifyOpenFiles(openFiles); + + openFiles.putAll( + DFSTestUtil.createOpenFiles(fs, "open-3", (BATCH_SIZE * 5))); + verifyOpenFiles(openFiles); + + while(openFiles.size() > 0) { + DFSTestUtil.closeOpenFiles(openFiles, 1); + verifyOpenFiles(openFiles); + } + } + + private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles) + throws IOException { + HashSet<Path> remainingFiles = new HashSet<>(openFiles.keySet()); + OpenFileEntry lastEntry = null; + BatchedEntries<OpenFileEntry> batchedEntries; + do { + if (lastEntry == null) { + batchedEntries = nnRpc.listOpenFiles(0); + } else { + batchedEntries = nnRpc.listOpenFiles(lastEntry.getId()); + } + assertTrue("Incorrect open files list size!", + batchedEntries.size() <= BATCH_SIZE); + for (int i = 0; i < batchedEntries.size(); i++) { + lastEntry = batchedEntries.get(i); + String filePath = lastEntry.getFilePath(); + LOG.info("OpenFile: " + filePath); + assertTrue("Unexpected open file: " + filePath, + remainingFiles.remove(new Path(filePath))); + } + } while (batchedEntries.hasMore()); + assertTrue(remainingFiles.size() + " open files not listed!", + remainingFiles.size() == 0); + } + + private Set<Path> createFiles(FileSystem fileSystem, String fileNamePrefix, + int numFilesToCreate) throws IOException { + HashSet<Path> files = new HashSet<>(); + for (int i = 0; i < numFilesToCreate; i++) { + Path filePath = new Path(fileNamePrefix + "-" + i); + DFSTestUtil.createFile(fileSystem, filePath, 1024, (short) 3, 1); + } + return files; + } + + /** + * Verify dfsadmin -listOpenFiles command in HA mode. + */ + @Test(timeout = 120000) + public void testListOpenFilesInHA() throws Exception { + fs.close(); + cluster.shutdown(); + HdfsConfiguration haConf = new HdfsConfiguration(); + haConf.setLong( + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, BATCH_SIZE); + MiniDFSCluster haCluster = + new MiniDFSCluster.Builder(haConf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(0) + .build(); + try { + HATestUtil.setFailoverConfigurations(haCluster, haConf); + FileSystem fileSystem = HATestUtil.configureFailoverFs(haCluster, haConf); + + List<ClientProtocol> namenodes = + HAUtil.getProxiesForAllNameNodesInNameservice(haConf, + HATestUtil.getLogicalHostname(haCluster)); + haCluster.transitionToActive(0); + assertTrue(HAUtil.isAtLeastOneActive(namenodes)); + + final byte[] data = new byte[1024]; + ThreadLocalRandom.current().nextBytes(data); + DFSTestUtil.createOpenFiles(fileSystem, "ha-open-file", + ((BATCH_SIZE * 4) + (BATCH_SIZE / 2))); + + final DFSAdmin dfsAdmin = new DFSAdmin(haConf); + final AtomicBoolean failoverCompleted = new AtomicBoolean(false); + final AtomicBoolean listOpenFilesError = new AtomicBoolean(false); + final int listingIntervalMsec = 250; + Thread clientThread = new Thread(new Runnable() { + @Override + public void run() { + while(!failoverCompleted.get()) { + try { + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[] {"-listOpenFiles"})); + // Sleep for some time to avoid + // flooding logs with listing. + Thread.sleep(listingIntervalMsec); + } catch (Exception e) { + listOpenFilesError.set(true); + LOG.info("Error listing open files: ", e); + break; + } + } + } + }); + clientThread.start(); + + // Let client list open files for few + // times before the NN failover. + Thread.sleep(listingIntervalMsec * 2); + + LOG.info("Shutting down Active NN0!"); + haCluster.shutdownNameNode(0); + LOG.info("Transitioning NN1 to Active!"); + haCluster.transitionToActive(1); + failoverCompleted.set(true); + + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[] {"-listOpenFiles"})); + assertFalse("Client Error!", listOpenFilesError.get()); + + clientThread.join(); + } finally { + if (haCluster != null) { + haCluster.shutdown(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc66e76a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 8f6b3ec..11b1620 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -23,9 +23,12 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import com.google.common.base.Supplier; import com.google.common.collect.Lists; import org.apache.commons.lang.text.StrBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationUtil; import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSClient; @@ -54,6 +57,8 @@ import java.io.File; import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Scanner; @@ -74,6 +79,7 @@ import static org.mockito.Mockito.when; * set/clrSpaceQuote are tested in {@link org.apache.hadoop.hdfs.TestQuota}. */ public class TestDFSAdmin { + private static final Log LOG = LogFactory.getLog(TestDFSAdmin.class); private Configuration conf = null; private MiniDFSCluster cluster; private DFSAdmin admin; @@ -478,6 +484,75 @@ public class TestDFSAdmin { } } + @Test(timeout = 300000L) + public void testListOpenFiles() throws Exception { + redirectStream(); + + final Configuration dfsConf = new HdfsConfiguration(); + dfsConf.setInt( + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); + dfsConf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + dfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, 5); + final Path baseDir = new Path( + PathUtils.getTestDir(getClass()).getAbsolutePath(), + GenericTestUtils.getMethodName()); + dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.toString()); + + final int numDataNodes = 3; + final int numClosedFiles = 25; + final int numOpenFiles = 15; + + try(MiniDFSCluster miniCluster = new MiniDFSCluster + .Builder(dfsConf) + .numDataNodes(numDataNodes).build()) { + final short replFactor = 1; + final long fileLength = 512L; + final FileSystem fs = miniCluster.getFileSystem(); + final Path parentDir = new Path("/tmp/files/"); + + fs.mkdirs(parentDir); + HashSet<Path> closedFileSet = new HashSet<>(); + for (int i = 0; i < numClosedFiles; i++) { + Path file = new Path(parentDir, "closed-file-" + i); + DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L); + closedFileSet.add(file); + } + + HashMap<Path, FSDataOutputStream> openFilesMap = new HashMap<>(); + for (int i = 0; i < numOpenFiles; i++) { + Path file = new Path(parentDir, "open-file-" + i); + DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L); + FSDataOutputStream outputStream = fs.append(file); + openFilesMap.put(file, outputStream); + } + + final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf); + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[]{"-listOpenFiles"})); + verifyOpenFilesListing(closedFileSet, openFilesMap); + + for (int count = 0; count < numOpenFiles; count++) { + closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFilesMap, 1)); + resetStream(); + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[]{"-listOpenFiles"})); + verifyOpenFilesListing(closedFileSet, openFilesMap); + } + } + } + + private void verifyOpenFilesListing(HashSet<Path> closedFileSet, + HashMap<Path, FSDataOutputStream> openFilesMap) { + final String outStr = scanIntoString(out); + LOG.info("dfsadmin -listOpenFiles output: \n" + out); + for (Path closedFilePath : closedFileSet) { + assertThat(outStr, not(containsString(closedFilePath.toString() + "\n"))); + } + for (Path openFilePath : openFilesMap.keySet()) { + assertThat(outStr, is(containsString(openFilePath.toString() + "\n"))); + } + } + private void verifyNodesAndCorruptBlocks( final int numDn, final int numLiveDn, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
