This is an automated email from the ASF dual-hosted git repository.
ZanderXu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 215a1edea61 HDFS-17910. [ARR] Support async listOpenFiles for routers
(#8454)
215a1edea61 is described below
commit 215a1edea61add4983cf7c58c9ebfc518adf0d6b
Author: Felix Nguyen <[email protected]>
AuthorDate: Wed May 6 10:28:00 2026 +0800
HDFS-17910. [ARR] Support async listOpenFiles for routers (#8454)
---
.../federation/router/RouterClientProtocol.java | 20 +++++++++++++--
.../router/async/RouterAsyncClientProtocol.java | 21 +++++++++++++++
.../federation/router/TestRouterListOpenFiles.java | 30 +++++++++++++++++++---
3 files changed, 66 insertions(+), 5 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index aac04a97849..5d822f2e6d5 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -22,8 +22,8 @@
import static
org.apache.hadoop.hdfs.server.federation.router.FederationUtil.updateMountPointStatus;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
-import org.apache.hadoop.fs.BatchedRemoteIterator;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@@ -1986,7 +1986,23 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long
prevId,
prevId, openFilesTypes, new RemoteParam());
Map<RemoteLocation, BatchedEntries> results =
rpcClient.invokeConcurrent(locations, method, true, false, -1,
BatchedEntries.class);
+ return mergeAndSortOpenFileListResults(results);
+ }
+ /**
+ * Merges the invocation results of listOpenFiles from downstream namespaces.
+ * To ensure no entries are skipped for the next call iteration, trims off
all entries with
+ * <pre>
+ * id > min([max([entry.id for entry in entries]) for entries per
namespace])
+ * </pre>
+ * then sorts the filtered results by id, in ascending order.
+ * @param results invocation results of listOpenFiles from downstream
namespaces
+ * @return {@link BatchedListEntries} object of merged entries
+ * @throws IOException when one file appears in different namespaces,
+ * and the path cannot resolve to a mount point
+ */
+ protected BatchedListEntries<OpenFileEntry> mergeAndSortOpenFileListResults(
+ Map<RemoteLocation, BatchedEntries> results) throws IOException {
// Get the largest inodeIds for each namespace, and the smallest inodeId
of them
// then ignore all entries above this id to keep a consistent prevId for
the next listOpenFiles
long minOfMax = Long.MAX_VALUE;
@@ -2040,7 +2056,7 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long
prevId,
}
List<OpenFileEntry> entryList = new ArrayList<>(routerEntries.values());
entryList.sort(Comparator.comparingLong(OpenFileEntry::getId));
- return new BatchedRemoteIterator.BatchedListEntries<>(entryList, hasMore);
+ return new BatchedListEntries<>(entryList, hasMore);
}
@Override
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java
index b64ebb31f96..b421c8f39eb 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java
@@ -20,6 +20,8 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FsServerDefaults;
@@ -34,6 +36,8 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
@@ -839,6 +843,23 @@ public ReplicatedBlockStats getReplicatedBlockStats()
throws IOException {
return asyncReturn(ReplicatedBlockStats.class);
}
+ @Override
+ public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
+ EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(path,
false, false);
+ RemoteMethod method =
+ new RemoteMethod("listOpenFiles", new Class<?>[] {long.class,
EnumSet.class, String.class},
+ prevId, openFilesTypes, new RemoteParam());
+ rpcClient.invokeConcurrent(locations, method, true, false, -1,
BatchedEntries.class);
+
+ asyncApply(o -> {
+ Map<RemoteLocation, BatchedEntries> results = (Map<RemoteLocation,
BatchedEntries>) o;
+ return mergeAndSortOpenFileListResults(results);
+ });
+ return asyncReturn(BatchedListEntries.class);
+ }
+
@Override
public DatanodeInfo[] getDatanodeReport(HdfsConstants.DatanodeReportType
type)
throws IOException {
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterListOpenFiles.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterListOpenFiles.java
index e96c7c757fe..300e2451a8f 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterListOpenFiles.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterListOpenFiles.java
@@ -28,9 +28,10 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.provider.MethodSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BatchedRemoteIterator;
@@ -49,10 +50,14 @@
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import static
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getAdminClient;
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
+import static
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+@MethodSource("getParameters")
+@ParameterizedClass
public class TestRouterListOpenFiles {
final private static String TEST_DESTINATION_PATH =
"/TestRouterListOpenFilesDst";
final private static int NUM_SUBCLUSTERS = 2;
@@ -63,12 +68,22 @@ public class TestRouterListOpenFiles {
private static DFSClient client0;
private static DFSClient client1;
private static DFSClient routerClient;
+ private final boolean useAsync;
- @BeforeAll
- public static void setup() throws Exception {
+ public TestRouterListOpenFiles(boolean useAsyncFlag) throws Exception {
+ this.useAsync = useAsyncFlag;
+ setup(useAsyncFlag);
+ }
+
+ public static Object[] getParameters() {
+ return new Object[] {true, false};
+ }
+
+ public void setup(boolean useAsyncFlag) throws Exception {
cluster = new StateStoreDFSCluster(false, NUM_SUBCLUSTERS,
MultipleDestinationMountTableResolver.class);
Configuration conf = new
RouterConfigBuilder().stateStore().heartbeat().admin().rpc().build();
+ conf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, useAsyncFlag);
conf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, "ns0,ns1");
conf.setBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE, true);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES,
BATCH_SIZE);
@@ -120,6 +135,9 @@ public void testSingleDestination() throws Exception {
BatchedRemoteIterator.BatchedEntries<OpenFileEntry> result =
routerProtocol.listOpenFiles(0,
EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
testPath);
+ if (useAsync) {
+ result = syncReturn(BatchedRemoteIterator.BatchedEntries.class);
+ }
// Should list only the entry on ns0
assertEquals(1, result.size());
assertEquals(testPath + "/file0", result.get(0).getFilePath());
@@ -138,6 +156,9 @@ public void testMultipleDestinations() throws Exception {
BatchedRemoteIterator.BatchedEntries<OpenFileEntry> result =
routerProtocol.listOpenFiles(0,
EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
testPath);
+ if (useAsync) {
+ result = syncReturn(BatchedRemoteIterator.BatchedEntries.class);
+ }
// Should list both entries on ns0 and ns1
assertEquals(2, result.size());
assertEquals(testPath + "/file0", result.get(0).getFilePath());
@@ -157,6 +178,9 @@ public void testMultipleDestinations() throws Exception {
result =
routerProtocol.listOpenFiles(0,
EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
testPath);
+ if (useAsync) {
+ result = syncReturn(BatchedRemoteIterator.BatchedEntries.class);
+ }
// Should list one file only
assertEquals(1, result.size());
assertEquals(routerClient.getFileInfo(TEST_DESTINATION_PATH +
"/file2").getFileId(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]