Repository: hadoop Updated Branches: refs/heads/trunk f214a9961 -> fb6898095
HDFS-10480. Add an admin command to list currently open files. Contributed by Manoj Govindassamy. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fb689809 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fb689809 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fb689809 Branch: refs/heads/trunk Commit: fb68980959f95f0d89e86f91909867724ad01791 Parents: f214a99 Author: Andrew Wang <[email protected]> Authored: Thu Jun 15 14:46:55 2017 -0700 Committer: Andrew Wang <[email protected]> Committed: Thu Jun 15 14:46:55 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hdfs/DFSClient.java | 12 + .../hadoop/hdfs/DistributedFileSystem.java | 15 ++ .../apache/hadoop/hdfs/client/HdfsAdmin.java | 15 ++ .../hadoop/hdfs/protocol/ClientProtocol.java | 12 + .../hadoop/hdfs/protocol/OpenFileEntry.java | 58 +++++ .../hadoop/hdfs/protocol/OpenFilesIterator.java | 59 +++++ .../ClientNamenodeProtocolTranslatorPB.java | 23 ++ .../hadoop/hdfs/protocolPB/PBHelperClient.java | 19 +- .../src/main/proto/ClientNamenodeProtocol.proto | 18 ++ .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + ...tNamenodeProtocolServerSideTranslatorPB.java | 20 ++ .../hdfs/server/namenode/FSNamesystem.java | 45 ++++ .../hdfs/server/namenode/LeaseManager.java | 51 +++- .../hdfs/server/namenode/NameNodeRpcServer.java | 8 + .../org/apache/hadoop/hdfs/tools/DFSAdmin.java | 60 +++++ .../src/main/resources/hdfs-default.xml | 10 + .../src/site/markdown/HDFSCommands.md | 2 + .../org/apache/hadoop/hdfs/DFSTestUtil.java | 40 +++- .../org/apache/hadoop/hdfs/TestHdfsAdmin.java | 59 +++++ .../hdfs/server/namenode/TestLeaseManager.java | 12 +- .../hdfs/server/namenode/TestListOpenFiles.java | 234 +++++++++++++++++++ .../apache/hadoop/hdfs/tools/TestDFSAdmin.java | 72 ++++++ 22 files changed, 841 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index ec142f0..51f04e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -128,6 +128,8 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator; import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; @@ -3025,4 +3027,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, Tracer getTracer() { return tracer; } + + /** + * Get a remote iterator to the open files list managed by NameNode. + * + * @throws IOException + */ + public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException { + checkOpen(); + return new OpenFilesIterator(namenode, tracer); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 2f60e9d..1fd8f79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -87,6 +87,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; @@ -2881,4 +2882,18 @@ public class DistributedFileSystem extends FileSystem { public HdfsDataOutputStreamBuilder createFile(Path path) { return new HdfsDataOutputStreamBuilder(this, path).create().overwrite(true); } + + /** + * Returns a RemoteIterator which can be used to list all open files + * currently managed by the NameNode. For large numbers of open files, + * iterator will fetch the list in batches of configured size. + * <p/> + * Since the list is fetched in batches, it does not represent a + * consistent snapshot of the all open files. + * <p/> + * This method can only be called by HDFS superusers. + */ + public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException { + return dfs.listOpenFiles(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java index 71f6a35..21de0ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; @@ -560,4 +561,18 @@ public class HdfsAdmin { dfs.setPermission(trashPath, TRASH_PERMISSION); } + /** + * Returns a RemoteIterator which can be used to list all open files + * currently managed by the NameNode. For large numbers of open files, + * iterator will fetch the list in batches of configured size. + * <p/> + * Since the list is fetched in batches, it does not represent a + * consistent snapshot of the all open files. + * <p/> + * This method can only be called by HDFS superusers. + */ + public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException { + return dfs.listOpenFiles(); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 82e5c32..e132e04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -1606,4 +1606,16 @@ public interface ClientProtocol { */ @Idempotent QuotaUsage getQuotaUsage(String path) throws IOException; + + /** + * List open files in the system in batches. INode id is the cursor and the + * open files returned in a batch will have their INode ids greater than + * the cursor INode id. Open files can only be requested by super user and + * the the list across batches are not atomic. + * + * @param prevId the cursor INode id. + * @throws IOException + */ + @Idempotent + BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFileEntry.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFileEntry.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFileEntry.java new file mode 100644 index 0000000..14e97d3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFileEntry.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * An open file entry for use by DFSAdmin commands. + */ [email protected] [email protected] +public class OpenFileEntry { + private final long id; + private final String filePath; + private final String clientName; + private final String clientMachine; + + public OpenFileEntry(long id, String filePath, + String clientName, String clientMachine) { + this.id = id; + this.filePath = filePath; + this.clientName = clientName; + this.clientMachine = clientMachine; + } + + public long getId() { + return id; + } + + public String getFilePath() { + return filePath; + } + + public String getClientMachine() { + return clientMachine; + } + + public String getClientName() { + return clientName; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java new file mode 100644 index 0000000..c24e585 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.BatchedRemoteIterator; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; + +/** + * OpenFilesIterator is a remote iterator that iterates over the open files list + * managed by the NameNode. Since the list is retrieved in batches, it does not + * represent a consistent view of all open files. + */ [email protected] [email protected] +public class OpenFilesIterator extends + BatchedRemoteIterator<Long, OpenFileEntry> { + private final ClientProtocol namenode; + private final Tracer tracer; + + public OpenFilesIterator(ClientProtocol namenode, Tracer tracer) { + super(HdfsConstants.GRANDFATHER_INODE_ID); + this.namenode = namenode; + this.tracer = tracer; + } + + @Override + public BatchedEntries<OpenFileEntry> makeRequest(Long prevId) + throws IOException { + try (TraceScope ignored = tracer.newScope("listOpenFiles")) { + return namenode.listOpenFiles(prevId); + } + } + + @Override + public Long elementToPrevKey(OpenFileEntry entry) { + return entry.getId(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index f29de15..0d517f80 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.BlocksStats; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; @@ -139,10 +140,13 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCa import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto; @@ -1752,4 +1756,23 @@ public class ClientNamenodeProtocolTranslatorPB implements throw ProtobufHelper.getRemoteException(e); } } + + @Override + public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) + throws IOException { + ListOpenFilesRequestProto req = + ListOpenFilesRequestProto.newBuilder().setId(prevId).build(); + try { + ListOpenFilesResponseProto response = rpcProxy.listOpenFiles(null, req); + List<OpenFileEntry> openFileEntries = + Lists.newArrayListWithCapacity(response.getEntriesCount()); + for (OpenFilesBatchResponseProto p : response.getEntriesList()) { + openFileEntries.add(PBHelperClient.convert(p)); + } + return new BatchedListEntries<>(openFileEntries, response.getHasMore()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 1716fba..b356583 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -91,6 +91,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.BlocksStats; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; @@ -120,6 +121,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEdi import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsECBlockGroupsStatsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsBlocksStatsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto; @@ -1253,6 +1255,21 @@ public class PBHelperClient { proto.getKeyName()); } + public static OpenFilesBatchResponseProto convert(OpenFileEntry + openFileEntry) { + return OpenFilesBatchResponseProto.newBuilder() + .setId(openFileEntry.getId()) + .setPath(openFileEntry.getFilePath()) + .setClientName(openFileEntry.getClientName()) + .setClientMachine(openFileEntry.getClientMachine()) + .build(); + } + + public static OpenFileEntry convert(OpenFilesBatchResponseProto proto) { + return new OpenFileEntry(proto.getId(), proto.getPath(), + proto.getClientName(), proto.getClientMachine()); + } + public static AclStatus convert(GetAclStatusResponseProto e) { AclStatusProto r = e.getResult(); AclStatus.Builder builder = new AclStatus.Builder(); @@ -2826,4 +2843,4 @@ public class PBHelperClient { } return ret; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 3b1504c..c56c0b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -777,6 +777,22 @@ message GetEditsFromTxidResponseProto { required EventsListProto eventsList = 1; } +message ListOpenFilesRequestProto { + required int64 id = 1; +} + +message OpenFilesBatchResponseProto { + required int64 id = 1; + required string path = 2; + required string clientName = 3; + required string clientMachine = 4; +} + +message ListOpenFilesResponseProto { + repeated OpenFilesBatchResponseProto entries = 1; + required bool hasMore = 2; +} + service ClientNamenodeProtocol { rpc getBlockLocations(GetBlockLocationsRequestProto) returns(GetBlockLocationsResponseProto); @@ -945,4 +961,6 @@ service ClientNamenodeProtocol { returns(GetErasureCodingCodecsResponseProto); rpc getQuotaUsage(GetQuotaUsageRequestProto) returns(GetQuotaUsageResponseProto); + rpc listOpenFiles(ListOpenFilesRequestProto) + returns(ListOpenFilesResponseProto); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 726cfb7..eaaff60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -873,6 +873,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY; public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100; public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses"; + public static final String DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES = + "dfs.namenode.list.openfiles.num.responses"; + public static final int DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES_DEFAULT = + 1000; public static final String DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_KEY = "dfs.namenode.edekcacheloader.interval.ms"; public static final int DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT = 1000; public static final String DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY = "dfs.namenode.edekcacheloader.initial.delay.ms"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index ba59ed8..7135ff1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; @@ -155,6 +156,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCa import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto; @@ -1717,4 +1720,21 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements throw new ServiceException(e); } } + + @Override + public ListOpenFilesResponseProto listOpenFiles(RpcController controller, + ListOpenFilesRequestProto req) throws ServiceException { + try { + BatchedEntries<OpenFileEntry> entries = server.listOpenFiles(req.getId()); + ListOpenFilesResponseProto.Builder builder = + ListOpenFilesResponseProto.newBuilder(); + builder.setHasMore(entries.hasMore()); + for (int i = 0; i < entries.size(); i++) { + builder.addEntries(PBHelperClient.convert(entries.get(i))); + } + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 2a611b3..3f7f1ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -91,6 +91,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*; import org.apache.hadoop.hdfs.protocol.BlocksStats; import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.apache.hadoop.util.Time.now; import static org.apache.hadoop.util.Time.monotonicNow; @@ -424,6 +425,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, /** Maximum time the lock is hold to release lease. */ private final long maxLockHoldToReleaseLeaseMs; + // Batch size for open files response + private final int maxListOpenFilesResponses; + // Scan interval is not configurable. private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS); @@ -874,6 +878,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, inodeAttributeProvider = ReflectionUtils.newInstance(klass, conf); LOG.info("Using INode attribute provider: " + klass.getName()); } + this.maxListOpenFilesResponses = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES_DEFAULT + ); + Preconditions.checkArgument(maxListOpenFilesResponses > 0, + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES + + " must be a positive integer." + ); } catch(IOException e) { LOG.error(getClass().getSimpleName() + " initialization failed.", e); close(); @@ -905,6 +917,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return maxLockHoldToReleaseLeaseMs; } + public int getMaxListOpenFilesResponses() { + return maxListOpenFilesResponses; + } + void lockRetryCache() { if (retryCache != null) { retryCache.lock(); @@ -1714,6 +1730,35 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, blockManager.metaSave(out); } + /** + * List open files in the system in batches. prevId is the cursor INode id and + * the open files returned in a batch will have their INode ids greater than + * this cursor. Open files can only be requested by super user and the the + * list across batches does not represent a consistent view of all open files. + * + * @param prevId the cursor INode id. + * @throws IOException + */ + BatchedListEntries<OpenFileEntry> listOpenFiles(long prevId) + throws IOException { + final String operationName = "listOpenFiles"; + checkSuperuserPrivilege(); + checkOperation(OperationCategory.READ); + readLock(); + BatchedListEntries<OpenFileEntry> batchedListEntries; + try { + checkOperation(OperationCategory.READ); + batchedListEntries = leaseManager.getUnderConstructionFiles(prevId); + } catch (AccessControlException e) { + logAuditEvent(false, operationName, null); + throw e; + } finally { + readUnlock(operationName); + } + logAuditEvent(true, operationName, null); + return batchedListEntries; + } + private String metaSaveAsString() { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java index 8695d63..38cdbb3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.PriorityQueue; @@ -40,7 +39,9 @@ import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.util.Daemon; @@ -94,7 +95,7 @@ public class LeaseManager { } }); // INodeID -> Lease - private final HashMap<Long, Lease> leasesById = new HashMap<>(); + private final TreeMap<Long, Lease> leasesById = new TreeMap<>(); private Daemon lmthread; private volatile boolean shouldRunMonitor; @@ -245,6 +246,52 @@ public class LeaseManager { return iipSet; } + /** + * Get a batch of under construction files from the currently active leases. + * File INodeID is the cursor used to fetch new batch of results and the + * batch size is configurable using below config param. Since the list is + * fetched in batches, it does not represent a consistent view of all + * open files. + * + * @see org.apache.hadoop.hdfs.DFSConfigKeys#DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES + * @param prevId the INodeID cursor + * @throws IOException + */ + public BatchedListEntries<OpenFileEntry> getUnderConstructionFiles( + final long prevId) throws IOException { + assert fsnamesystem.hasReadLock(); + SortedMap<Long, Lease> remainingLeases; + synchronized (this) { + remainingLeases = leasesById.tailMap(prevId, false); + } + Collection<Long> inodeIds = remainingLeases.keySet(); + final int numResponses = Math.min( + this.fsnamesystem.getMaxListOpenFilesResponses(), inodeIds.size()); + final List<OpenFileEntry> openFileEntries = + Lists.newArrayListWithExpectedSize(numResponses); + + int count = 0; + for (Long inodeId: inodeIds) { + final INodeFile inodeFile = + fsnamesystem.getFSDirectory().getInode(inodeId).asFile(); + if (!inodeFile.isUnderConstruction()) { + LOG.warn("The file " + inodeFile.getFullPathName() + + " is not under construction but has lease."); + continue; + } + openFileEntries.add(new OpenFileEntry( + inodeFile.getId(), inodeFile.getFullPathName(), + inodeFile.getFileUnderConstructionFeature().getClientName(), + inodeFile.getFileUnderConstructionFeature().getClientMachine())); + count++; + if (count >= numResponses) { + break; + } + } + boolean hasMore = (numResponses < remainingLeases.size()); + return new BatchedListEntries<>(openFileEntries, hasMore); + } + /** @return the lease containing src */ public synchronized Lease getLease(INodeFile src) {return leasesById.get(src.getId());} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index fff29df..e11a546 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -110,6 +110,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; @@ -1309,6 +1310,13 @@ public class NameNodeRpcServer implements NamenodeProtocols { } @Override // ClientProtocol + public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) + throws IOException { + checkNNStartup(); + return namesystem.listOpenFiles(prevId); + } + + @Override // ClientProtocol public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) throws IOException { checkNNStartup(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index d82dfc4..70509d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -49,7 +49,9 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.FsTracer; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.shell.Command; import org.apache.hadoop.fs.shell.CommandFormat; import org.apache.hadoop.fs.shell.PathData; @@ -73,6 +75,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator; import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotException; @@ -455,6 +459,7 @@ public class DFSAdmin extends FsShell { "\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" + "\t[-metasave filename]\n" + "\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]\n" + + "\t[-listOpenFiles]\n" + "\t[-help [cmd]]\n"; /** @@ -882,6 +887,45 @@ public class DFSAdmin extends FsShell { } /** + * Command to list all the open files currently managed by NameNode. + * Usage: hdfs dfsadmin -listOpenFiles + * + * @throws IOException + */ + public int listOpenFiles() throws IOException { + DistributedFileSystem dfs = getDFS(); + Configuration dfsConf = dfs.getConf(); + URI dfsUri = dfs.getUri(); + boolean isHaEnabled = HAUtilClient.isLogicalUri(dfsConf, dfsUri); + + RemoteIterator<OpenFileEntry> openFilesRemoteIterator; + if (isHaEnabled) { + ProxyAndInfo<ClientProtocol> proxy = NameNodeProxies.createNonHAProxy( + dfsConf, HAUtil.getAddressOfActive(getDFS()), ClientProtocol.class, + UserGroupInformation.getCurrentUser(), false); + openFilesRemoteIterator = new OpenFilesIterator(proxy.getProxy(), + FsTracer.get(dfsConf)); + } else { + openFilesRemoteIterator = dfs.listOpenFiles(); + } + printOpenFiles(openFilesRemoteIterator); + return 0; + } + + private void printOpenFiles(RemoteIterator<OpenFileEntry> openFilesIterator) + throws IOException { + System.out.println(String.format("%-20s\t%-20s\t%s", "Client Host", + "Client Name", "Open File Path")); + while (openFilesIterator.hasNext()) { + OpenFileEntry openFileEntry = openFilesIterator.next(); + System.out.println(String.format("%-20s\t%-20s\t%20s", + openFileEntry.getClientMachine(), + openFileEntry.getClientName(), + openFileEntry.getFilePath())); + } + } + + /** * Command to ask the namenode to set the balancer bandwidth for all of the * datanodes. * Usage: hdfs dfsadmin -setBalancerBandwidth bandwidth @@ -1138,6 +1182,10 @@ public class DFSAdmin extends FsShell { + "\tIf 'incremental' is specified, it will be an incremental\n" + "\tblock report; otherwise, it will be a full block report.\n"; + String listOpenFiles = "-listOpenFiles\n" + + "\tList all open files currently managed by the NameNode along\n" + + "\twith client name and client machine accessing them.\n"; + String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" + "\t\tis specified.\n"; @@ -1203,6 +1251,8 @@ public class DFSAdmin extends FsShell { System.out.println(evictWriters); } else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) { System.out.println(getDatanodeInfo); + } else if ("listOpenFiles".equalsIgnoreCase(cmd)) { + System.out.println(listOpenFiles); } else if ("help".equals(cmd)) { System.out.println(help); } else { @@ -1238,6 +1288,7 @@ public class DFSAdmin extends FsShell { System.out.println(evictWriters); System.out.println(getDatanodeInfo); System.out.println(triggerBlockReport); + System.out.println(listOpenFiles); System.out.println(help); System.out.println(); ToolRunner.printGenericCommandUsage(System.out); @@ -1879,6 +1930,8 @@ public class DFSAdmin extends FsShell { } else if ("-triggerBlockReport".equals(cmd)) { System.err.println("Usage: hdfs dfsadmin" + " [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]"); + } else if ("-listOpenFiles".equals(cmd)) { + System.err.println("Usage: hdfs dfsadmin [-listOpenFiles]"); } else { System.err.println("Usage: hdfs dfsadmin"); System.err.println("Note: Administrative commands can only be run as the HDFS superuser."); @@ -2032,6 +2085,11 @@ public class DFSAdmin extends FsShell { printUsage(cmd); return exitCode; } + } else if ("-listOpenFiles".equals(cmd)) { + if (argv.length != 1) { + printUsage(cmd); + return exitCode; + } } // initialize DFSAdmin @@ -2113,6 +2171,8 @@ public class DFSAdmin extends FsShell { exitCode = reconfig(argv, i); } else if ("-triggerBlockReport".equals(cmd)) { exitCode = triggerBlockReport(argv); + } else if ("-listOpenFiles".equals(cmd)) { + exitCode = listOpenFiles(); } else if ("-help".equals(cmd)) { if (i < argv.length) { printHelp(argv[i]); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 82090e6..dbf78fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2788,6 +2788,16 @@ </description> </property> + <property> + <name>dfs.namenode.list.openfiles.num.responses</name> + <value>1000</value> + <description> + When listing open files, the maximum number of open files that will be + returned in a single batch. Fetching the list incrementally in batches + improves namenode performance. + </description> + </property> + <property> <name>dfs.namenode.edekcacheloader.interval.ms</name> <value>1000</value> http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index ffffee9..be0f89e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -370,6 +370,7 @@ Usage: hdfs dfsadmin [-getDatanodeInfo <datanode_host:ipc_port>] hdfs dfsadmin [-metasave filename] hdfs dfsadmin [-triggerBlockReport [-incremental] <datanode_host:ipc_port>] + hdfs dfsadmin [-listOpenFiles] hdfs dfsadmin [-help [cmd]] | COMMAND\_OPTION | Description | @@ -406,6 +407,7 @@ Usage: | `-getDatanodeInfo` \<datanode\_host:ipc\_port\> | Get the information about the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. | | `-metasave` filename | Save Namenode's primary data structures to *filename* in the directory specified by hadoop.log.dir property. *filename* is overwritten if it exists. *filename* will contain one line for each of the following<br/>1. Datanodes heart beating with Namenode<br/>2. Blocks waiting to be replicated<br/>3. Blocks currently being replicated<br/>4. Blocks waiting to be deleted | | `-triggerBlockReport` `[-incremental]` \<datanode\_host:ipc\_port\> | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. | +| `-listOpenFiles` | List all open files currently managed by the NameNode along with client name and client machine accessing them. | | `-help` [cmd] | Displays help for the given command or all commands if none is specified. | Runs a HDFS dfsadmin client. http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 5075c05..f3572ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -62,8 +62,10 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.UUID; @@ -1863,8 +1865,8 @@ public class DFSTestUtil { }, 100, waitTime); } - /** - * Change the length of a block at datanode dnIndex + /** + * Change the length of a block at datanode dnIndex. */ public static boolean changeReplicaLength(MiniDFSCluster cluster, ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException { @@ -2249,4 +2251,38 @@ public class DFSTestUtil { assertFalse("File in trash : " + trashPath, fs.exists(trashPath)); } } + + public static Map<Path, FSDataOutputStream> createOpenFiles(FileSystem fs, + String filePrefix, int numFilesToCreate) throws IOException { + final Map<Path, FSDataOutputStream> filesCreated = new HashMap<>(); + final byte[] buffer = new byte[(int) (1024 * 1.75)]; + final Random rand = new Random(0xFEED0BACL); + for (int i = 0; i < numFilesToCreate; i++) { + Path file = new Path("/" + filePrefix + "-" + i); + FSDataOutputStream stm = fs.create(file, true, 1024, (short) 1, 1024); + rand.nextBytes(buffer); + stm.write(buffer); + filesCreated.put(file, stm); + } + return filesCreated; + } + + public static HashSet<Path> closeOpenFiles( + HashMap<Path, FSDataOutputStream> openFilesMap, + int numFilesToClose) throws IOException { + HashSet<Path> closedFiles = new HashSet<>(); + for (Iterator<Entry<Path, FSDataOutputStream>> it = + openFilesMap.entrySet().iterator(); it.hasNext();) { + Entry<Path, FSDataOutputStream> entry = it.next(); + LOG.info("Closing file: " + entry.getKey()); + entry.getValue().close(); + closedFiles.add(entry.getKey()); + it.remove(); + numFilesToClose--; + if (numFilesToClose == 0) { + break; + } + } + return closedFiles; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java index fe20c68..685ea8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java @@ -18,12 +18,14 @@ package org.apache.hadoop.hdfs; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -31,11 +33,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; import org.apache.hadoop.fs.BlockStoragePolicySpi; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.junit.After; import org.junit.Assert; @@ -49,11 +54,15 @@ public class TestHdfsAdmin { private static final Path TEST_PATH = new Path("/test"); private static final short REPL = 1; private static final int SIZE = 128; + private static final int OPEN_FILES_BATCH_SIZE = 5; private final Configuration conf = new Configuration(); private MiniDFSCluster cluster; @Before public void setUpCluster() throws IOException { + conf.setLong( + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, + OPEN_FILES_BATCH_SIZE); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); cluster.waitActive(); } @@ -205,4 +214,54 @@ public class TestHdfsAdmin { Assert.assertNotNull("should not return null for an encrypted cluster", hdfsAdmin.getKeyProvider()); } + + @Test(timeout = 120000L) + public void testListOpenFiles() throws IOException { + HashSet<Path> closedFileSet = new HashSet<>(); + HashMap<Path, FSDataOutputStream> openFileMap = new HashMap<>(); + FileSystem fs = FileSystem.get(conf); + verifyOpenFiles(closedFileSet, openFileMap); + + int numClosedFiles = OPEN_FILES_BATCH_SIZE * 4; + int numOpenFiles = (OPEN_FILES_BATCH_SIZE * 3) + 1; + for (int i = 0; i < numClosedFiles; i++) { + Path filePath = new Path("/closed-file-" + i); + DFSTestUtil.createFile(fs, filePath, SIZE, REPL, 0); + closedFileSet.add(filePath); + } + verifyOpenFiles(closedFileSet, openFileMap); + + openFileMap.putAll( + DFSTestUtil.createOpenFiles(fs, "open-file-1", numOpenFiles)); + verifyOpenFiles(closedFileSet, openFileMap); + + closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFileMap, + openFileMap.size() / 2)); + verifyOpenFiles(closedFileSet, openFileMap); + + openFileMap.putAll( + DFSTestUtil.createOpenFiles(fs, "open-file-2", 10)); + verifyOpenFiles(closedFileSet, openFileMap); + + while(openFileMap.size() > 0) { + closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFileMap, 1)); + verifyOpenFiles(closedFileSet, openFileMap); + } + } + + private void verifyOpenFiles(HashSet<Path> closedFiles, + HashMap<Path, FSDataOutputStream> openFileMap) throws IOException { + HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf); + HashSet<Path> openFiles = new HashSet<>(openFileMap.keySet()); + RemoteIterator<OpenFileEntry> openFilesRemoteItr = + hdfsAdmin.listOpenFiles(); + while (openFilesRemoteItr.hasNext()) { + String filePath = openFilesRemoteItr.next().getFilePath(); + assertFalse(filePath + " should not be listed under open files!", + closedFiles.contains(filePath)); + assertTrue(filePath + " is not listed under open files!", + openFiles.remove(new Path(filePath))); + } + assertTrue("Not all open files are listed!", openFiles.isEmpty()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java index 74752f9..55bc7c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java @@ -40,6 +40,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -190,6 +191,7 @@ public class TestLeaseManager { @Test (timeout = 60000) public void testInodeWithLeases() throws Exception { FSNamesystem fsNamesystem = makeMockFsNameSystem(); + when(fsNamesystem.getMaxListOpenFilesResponses()).thenReturn(1024); FSDirectory fsDirectory = fsNamesystem.getFSDirectory(); LeaseManager lm = new LeaseManager(fsNamesystem); Set<Long> iNodeIds = new HashSet<>(Arrays.asList( @@ -208,6 +210,7 @@ public class TestLeaseManager { for (Long iNodeId : iNodeIds) { INodeFile iNodeFile = stubInodeFile(iNodeId); + iNodeFile.toUnderConstruction("hbase", "gce-100"); iNodeFile.setParent(rootInodeDirectory); when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile); lm.addLease("holder_" + iNodeId, iNodeId); @@ -230,6 +233,7 @@ public class TestLeaseManager { @Test (timeout = 240000) public void testInodeWithLeasesAtScale() throws Exception { FSNamesystem fsNamesystem = makeMockFsNameSystem(); + when(fsNamesystem.getMaxListOpenFilesResponses()).thenReturn(4096); FSDirectory fsDirectory = fsNamesystem.getFSDirectory(); LeaseManager lm = new LeaseManager(fsNamesystem); @@ -275,7 +279,7 @@ public class TestLeaseManager { private void testInodeWithLeasesAtScaleImpl(final LeaseManager leaseManager, final FSDirectory fsDirectory, INodeDirectory ancestorDirectory, - int scale) { + int scale) throws IOException { verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0); Set<Long> iNodeIds = new HashSet<>(); @@ -284,6 +288,7 @@ public class TestLeaseManager { } for (Long iNodeId : iNodeIds) { INodeFile iNodeFile = stubInodeFile(iNodeId); + iNodeFile.toUnderConstruction("hbase", "gce-100"); iNodeFile.setParent(ancestorDirectory); when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile); leaseManager.addLease("holder_" + iNodeId, iNodeId); @@ -386,13 +391,16 @@ public class TestLeaseManager { private void verifyINodeLeaseCounts(final LeaseManager leaseManager, INodeDirectory ancestorDirectory, int iNodeIdWithLeaseCount, - int iNodeWithLeaseCount, int iNodeUnderAncestorLeaseCount) { + int iNodeWithLeaseCount, int iNodeUnderAncestorLeaseCount) + throws IOException { assertEquals(iNodeIdWithLeaseCount, leaseManager.getINodeIdWithLeases().size()); assertEquals(iNodeWithLeaseCount, leaseManager.getINodeWithLeases().size()); assertEquals(iNodeUnderAncestorLeaseCount, leaseManager.getINodeWithLeases(ancestorDirectory).size()); + assertEquals(iNodeIdWithLeaseCount, + leaseManager.getUnderConstructionFiles(0).size()); } private Map<String, INode> createINodeTree(INodeDirectory parentDir, http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java new file mode 100644 index 0000000..b290194 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.tools.DFSAdmin; +import org.apache.hadoop.util.ToolRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Verify open files listing. + */ +public class TestListOpenFiles { + private static final int NUM_DATA_NODES = 3; + private static final int BATCH_SIZE = 5; + private static MiniDFSCluster cluster = null; + private static DistributedFileSystem fs = null; + private static NamenodeProtocols nnRpc = null; + private static final Log LOG = LogFactory.getLog(TestListOpenFiles.class); + + @Before + public void setUp() throws IOException { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + conf.setLong( + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, BATCH_SIZE); + cluster = new MiniDFSCluster.Builder(conf). + numDataNodes(NUM_DATA_NODES).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + nnRpc = cluster.getNameNodeRpc(); + } + + @After + public void tearDown() throws IOException { + if (fs != null) { + fs.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test(timeout = 120000L) + public void testListOpenFilesViaNameNodeRPC() throws Exception { + HashMap<Path, FSDataOutputStream> openFiles = new HashMap<>(); + createFiles(fs, "closed", 10); + verifyOpenFiles(openFiles); + + BatchedEntries<OpenFileEntry> openFileEntryBatchedEntries = + nnRpc.listOpenFiles(0); + assertTrue("Open files list should be empty!", + openFileEntryBatchedEntries.size() == 0); + + openFiles.putAll( + DFSTestUtil.createOpenFiles(fs, "open-1", 1)); + verifyOpenFiles(openFiles); + + openFiles.putAll( + DFSTestUtil.createOpenFiles(fs, "open-2", + (BATCH_SIZE * 2 + BATCH_SIZE / 2))); + verifyOpenFiles(openFiles); + + DFSTestUtil.closeOpenFiles(openFiles, openFiles.size() / 2); + verifyOpenFiles(openFiles); + + openFiles.putAll( + DFSTestUtil.createOpenFiles(fs, "open-3", (BATCH_SIZE * 5))); + verifyOpenFiles(openFiles); + + while(openFiles.size() > 0) { + DFSTestUtil.closeOpenFiles(openFiles, 1); + verifyOpenFiles(openFiles); + } + } + + private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles) + throws IOException { + HashSet<Path> remainingFiles = new HashSet<>(openFiles.keySet()); + OpenFileEntry lastEntry = null; + BatchedEntries<OpenFileEntry> batchedEntries; + do { + if (lastEntry == null) { + batchedEntries = nnRpc.listOpenFiles(0); + } else { + batchedEntries = nnRpc.listOpenFiles(lastEntry.getId()); + } + assertTrue("Incorrect open files list size!", + batchedEntries.size() <= BATCH_SIZE); + for (int i = 0; i < batchedEntries.size(); i++) { + lastEntry = batchedEntries.get(i); + String filePath = lastEntry.getFilePath(); + LOG.info("OpenFile: " + filePath); + assertTrue("Unexpected open file: " + filePath, + remainingFiles.remove(new Path(filePath))); + } + } while (batchedEntries.hasMore()); + assertTrue(remainingFiles.size() + " open files not listed!", + remainingFiles.size() == 0); + } + + private Set<Path> createFiles(FileSystem fileSystem, String fileNamePrefix, + int numFilesToCreate) throws IOException { + HashSet<Path> files = new HashSet<>(); + for (int i = 0; i < numFilesToCreate; i++) { + Path filePath = new Path(fileNamePrefix + "-" + i); + DFSTestUtil.createFile(fileSystem, filePath, 1024, (short) 3, 1); + } + return files; + } + + /** + * Verify dfsadmin -listOpenFiles command in HA mode. + */ + @Test(timeout = 120000) + public void testListOpenFilesInHA() throws Exception { + fs.close(); + cluster.shutdown(); + HdfsConfiguration haConf = new HdfsConfiguration(); + haConf.setLong( + DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, BATCH_SIZE); + MiniDFSCluster haCluster = + new MiniDFSCluster.Builder(haConf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(0) + .build(); + try { + HATestUtil.setFailoverConfigurations(haCluster, haConf); + FileSystem fileSystem = HATestUtil.configureFailoverFs(haCluster, haConf); + + List<ClientProtocol> namenodes = + HAUtil.getProxiesForAllNameNodesInNameservice(haConf, + HATestUtil.getLogicalHostname(haCluster)); + haCluster.transitionToActive(0); + assertTrue(HAUtil.isAtLeastOneActive(namenodes)); + + final byte[] data = new byte[1024]; + ThreadLocalRandom.current().nextBytes(data); + DFSTestUtil.createOpenFiles(fileSystem, "ha-open-file", + ((BATCH_SIZE * 4) + (BATCH_SIZE / 2))); + + final DFSAdmin dfsAdmin = new DFSAdmin(haConf); + final AtomicBoolean failoverCompleted = new AtomicBoolean(false); + final AtomicBoolean listOpenFilesError = new AtomicBoolean(false); + final int listingIntervalMsec = 250; + Thread clientThread = new Thread(new Runnable() { + @Override + public void run() { + while(!failoverCompleted.get()) { + try { + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[] {"-listOpenFiles"})); + // Sleep for some time to avoid + // flooding logs with listing. + Thread.sleep(listingIntervalMsec); + } catch (Exception e) { + listOpenFilesError.set(true); + LOG.info("Error listing open files: ", e); + break; + } + } + } + }); + clientThread.start(); + + // Let client list open files for few + // times before the NN failover. + Thread.sleep(listingIntervalMsec * 2); + + LOG.info("Shutting down Active NN0!"); + haCluster.shutdownNameNode(0); + LOG.info("Transitioning NN1 to Active!"); + haCluster.transitionToActive(1); + failoverCompleted.set(true); + + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[] {"-listOpenFiles"})); + assertFalse("Client Error!", listOpenFilesError.get()); + + clientThread.join(); + } finally { + if (haCluster != null) { + haCluster.shutdown(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb689809/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index a23fe81..2ef45e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationUtil; import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSClient; @@ -60,6 +61,8 @@ import java.io.File; import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Scanner; import java.util.concurrent.TimeoutException; @@ -593,6 +596,75 @@ public class TestDFSAdmin { } } + @Test(timeout = 300000L) + public void testListOpenFiles() throws Exception { + redirectStream(); + + final Configuration dfsConf = new HdfsConfiguration(); + dfsConf.setInt( + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); + dfsConf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); + dfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, 5); + final Path baseDir = new Path( + PathUtils.getTestDir(getClass()).getAbsolutePath(), + GenericTestUtils.getMethodName()); + dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.toString()); + + final int numDataNodes = 3; + final int numClosedFiles = 25; + final int numOpenFiles = 15; + + try(MiniDFSCluster miniCluster = new MiniDFSCluster + .Builder(dfsConf) + .numDataNodes(numDataNodes).build()) { + final short replFactor = 1; + final long fileLength = 512L; + final FileSystem fs = miniCluster.getFileSystem(); + final Path parentDir = new Path("/tmp/files/"); + + fs.mkdirs(parentDir); + HashSet<Path> closedFileSet = new HashSet<>(); + for (int i = 0; i < numClosedFiles; i++) { + Path file = new Path(parentDir, "closed-file-" + i); + DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L); + closedFileSet.add(file); + } + + HashMap<Path, FSDataOutputStream> openFilesMap = new HashMap<>(); + for (int i = 0; i < numOpenFiles; i++) { + Path file = new Path(parentDir, "open-file-" + i); + DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L); + FSDataOutputStream outputStream = fs.append(file); + openFilesMap.put(file, outputStream); + } + + final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf); + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[]{"-listOpenFiles"})); + verifyOpenFilesListing(closedFileSet, openFilesMap); + + for (int count = 0; count < numOpenFiles; count++) { + closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFilesMap, 1)); + resetStream(); + assertEquals(0, ToolRunner.run(dfsAdmin, + new String[]{"-listOpenFiles"})); + verifyOpenFilesListing(closedFileSet, openFilesMap); + } + } + } + + private void verifyOpenFilesListing(HashSet<Path> closedFileSet, + HashMap<Path, FSDataOutputStream> openFilesMap) { + final String outStr = scanIntoString(out); + LOG.info("dfsadmin -listOpenFiles output: \n" + out); + for (Path closedFilePath : closedFileSet) { + assertThat(outStr, not(containsString(closedFilePath.toString() + "\n"))); + } + for (Path openFilePath : openFilesMap.keySet()) { + assertThat(outStr, is(containsString(openFilePath.toString() + "\n"))); + } + } + private void verifyNodesAndCorruptBlocks( final int numDn, final int numLiveDn, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
