This is an automated email from the ASF dual-hosted git repository.
keepromise pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8542fb8c915 HDFS-17632. RBF: Support listOpenFiles for routers
(#8072). Contributed by kokonguyen191.
8542fb8c915 is described below
commit 8542fb8c915955ade23d6f2a58d4690d224c34a5
Author: Felix Nguyen <[email protected]>
AuthorDate: Mon Jan 26 10:23:25 2026 +0800
HDFS-17632. RBF: Support listOpenFiles for routers (#8072). Contributed by
kokonguyen191.
Reviewed-by: Jian Zhang <[email protected]>
Reviewed-by: He Xiaoqiao <[email protected]>
---
.../federation/router/RouterClientProtocol.java | 66 +++++-
.../federation/router/TestRouterListOpenFiles.java | 239 +++++++++++++++++++++
.../hadoop/hdfs/server/namenode/FSDirectory.java | 8 +-
3 files changed, 309 insertions(+), 4 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index 41454458eb8..aac04a97849 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -22,6 +22,7 @@
import static
org.apache.hadoop.hdfs.server.federation.router.FederationUtil.updateMountPointStatus;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
@@ -83,6 +84,7 @@
import
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import
org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import
org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
import
org.apache.hadoop.hdfs.server.federation.router.async.AsyncErasureCoding;
@@ -1977,8 +1979,68 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long
prevId)
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
throws IOException {
- rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
- return null;
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(path,
false, false);
+ RemoteMethod method =
+ new RemoteMethod("listOpenFiles", new Class<?>[] {long.class,
EnumSet.class, String.class},
+ prevId, openFilesTypes, new RemoteParam());
+ Map<RemoteLocation, BatchedEntries> results =
+ rpcClient.invokeConcurrent(locations, method, true, false, -1,
BatchedEntries.class);
+
+ // Get the largest inodeIds for each namespace, and the smallest inodeId
of them
+ // then ignore all entries above this id to keep a consistent prevId for
the next listOpenFiles
+ long minOfMax = Long.MAX_VALUE;
+ for (BatchedEntries nsEntries : results.values()) {
+ // Only need to care about namespaces that still have more files to
report
+ if (!nsEntries.hasMore()) {
+ continue;
+ }
+ long max = 0;
+ for (int i = 0; i < nsEntries.size(); i++) {
+ max = Math.max(max, ((OpenFileEntry) nsEntries.get(i)).getId());
+ }
+ minOfMax = Math.min(minOfMax, max);
+ }
+ // Concatenate all entries into one result, sorted by inodeId
+ boolean hasMore = false;
+ Map<String, OpenFileEntry> routerEntries = new HashMap<>();
+ Map<String, RemoteLocation> resolvedPaths = new HashMap<>();
+ for (Map.Entry<RemoteLocation, BatchedEntries> entry : results.entrySet())
{
+ BatchedEntries nsEntries = entry.getValue();
+ hasMore |= nsEntries.hasMore();
+ for (int i = 0; i < nsEntries.size(); i++) {
+ OpenFileEntry ofe = (OpenFileEntry) nsEntries.get(i);
+ if (ofe.getId() > minOfMax) {
+ hasMore = true;
+ break;
+ }
+ RemoteLocation remoteLoc = entry.getKey();
+ String routerPath =
ofe.getFilePath().replaceFirst(remoteLoc.getDest(), remoteLoc.getSrc());
+ OpenFileEntry newEntry =
+ new OpenFileEntry(ofe.getId(), routerPath, ofe.getClientName(),
+ ofe.getClientMachine());
+ // An existing file already resolves to the same path.
+ // Resolve according to mount table and keep the best path.
+ if (resolvedPaths.containsKey(routerPath)) {
+ PathLocation pathLoc =
subclusterResolver.getDestinationForPath(routerPath);
+ List<String> namespaces = pathLoc.getDestinations().stream().map(
+ RemoteLocation::getNameserviceId).collect(
+ Collectors.toList());
+ int existingIdx =
namespaces.indexOf(resolvedPaths.get(routerPath).getNameserviceId());
+ int currentIdx = namespaces.indexOf(remoteLoc.getNameserviceId());
+ if (currentIdx < existingIdx && currentIdx != -1) {
+ routerEntries.put(routerPath, newEntry);
+ resolvedPaths.put(routerPath, remoteLoc);
+ }
+ } else {
+ routerEntries.put(routerPath, newEntry);
+ resolvedPaths.put(routerPath, remoteLoc);
+ }
+ }
+ }
+ List<OpenFileEntry> entryList = new ArrayList<>(routerEntries.values());
+ entryList.sort(Comparator.comparingLong(OpenFileEntry::getId));
+ return new BatchedRemoteIterator.BatchedListEntries<>(entryList, hasMore);
}
@Override
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterListOpenFiles.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterListOpenFiles.java
new file mode 100644
index 00000000000..e96c7c757fe
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterListOpenFiles.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import
org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
+import
org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+
+import static
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getAdminClient;
+import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestRouterListOpenFiles {
+ final private static String TEST_DESTINATION_PATH =
"/TestRouterListOpenFilesDst";
+ final private static int NUM_SUBCLUSTERS = 2;
+ final private static int BATCH_SIZE = 3;
+ private static StateStoreDFSCluster cluster;
+ private static MiniRouterDFSCluster.RouterContext routerContext;
+ private static RouterClientProtocol routerProtocol;
+ private static DFSClient client0;
+ private static DFSClient client1;
+ private static DFSClient routerClient;
+
+ @BeforeAll
+ public static void setup() throws Exception {
+ cluster = new StateStoreDFSCluster(false, NUM_SUBCLUSTERS,
+ MultipleDestinationMountTableResolver.class);
+ Configuration conf = new
RouterConfigBuilder().stateStore().heartbeat().admin().rpc().build();
+ conf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, "ns0,ns1");
+ conf.setBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE, true);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES,
BATCH_SIZE);
+ cluster.addRouterOverrides(conf);
+ cluster.startCluster(conf);
+ cluster.startRouters();
+ cluster.waitClusterUp();
+ routerContext = cluster.getRandomRouter();
+ routerProtocol =
routerContext.getRouterRpcServer().getClientProtocolModule();
+ routerClient = routerContext.getClient();
+ client0 = cluster.getNamenode("ns0", null).getClient();
+ client1 = cluster.getNamenode("ns1", null).getClient();
+ }
+
+ @AfterAll
+ public static void cleanup() {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ @BeforeEach
+ public void resetInodeId() throws IOException {
+ cluster.getNamenode("ns0",
null).getNamenode().getNamesystem().getFSDirectory()
+ .resetLastInodeIdWithoutChecking(12345);
+ cluster.getNamenode("ns1",
null).getNamenode().getNamesystem().getFSDirectory()
+ .resetLastInodeIdWithoutChecking(12345);
+ // Create 2 dirs with the same name on 2 different nss
+ client0.mkdirs(TEST_DESTINATION_PATH);
+ client1.mkdirs(TEST_DESTINATION_PATH);
+ }
+
+ @AfterEach
+ public void cleanupNamespaces() throws IOException {
+ client0.delete("/", true);
+ client1.delete("/", true);
+ }
+
+ @Test
+ public void testSingleDestination() throws Exception {
+ String testPath = "/" + getMethodName();
+ createMountTableEntry(testPath, Collections.singletonList("ns0"));
+
+ // Open 2 files with different names
+ OutputStream os0 = client0.create(TEST_DESTINATION_PATH + "/file0", true);
+ OutputStream os1 = client1.create(TEST_DESTINATION_PATH + "/file1", true);
+
+ BatchedRemoteIterator.BatchedEntries<OpenFileEntry> result =
+ routerProtocol.listOpenFiles(0,
EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
+ testPath);
+ // Should list only the entry on ns0
+ assertEquals(1, result.size());
+ assertEquals(testPath + "/file0", result.get(0).getFilePath());
+ os0.close();
+ os1.close();
+ }
+
+ @Test
+ public void testMultipleDestinations() throws Exception {
+ String testPath = "/" + getMethodName();
+ createMountTableEntry(testPath, cluster.getNameservices());
+
+ // Open 2 files with different names
+ OutputStream os0 = client0.create(TEST_DESTINATION_PATH + "/file0", true);
+ OutputStream os1 = client1.create(TEST_DESTINATION_PATH + "/file1", true);
+ BatchedRemoteIterator.BatchedEntries<OpenFileEntry> result =
+ routerProtocol.listOpenFiles(0,
EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
+ testPath);
+ // Should list both entries on ns0 and ns1
+ assertEquals(2, result.size());
+ assertEquals(testPath + "/file0", result.get(0).getFilePath());
+ assertEquals(testPath + "/file1", result.get(1).getFilePath());
+ RemoteIterator<OpenFileEntry> ite = routerClient.listOpenFiles(testPath);
+ while (ite.hasNext()) {
+ OpenFileEntry ofe = ite.next();
+ assertTrue(ofe.getFilePath().equals(testPath + "/file0") ||
ofe.getFilePath()
+ .equals(testPath + "/file1"));
+ }
+ os0.close();
+ os1.close();
+
+ // Open 2 files with same name
+ os0 = client0.create(TEST_DESTINATION_PATH + "/file2", true);
+ os1 = client1.create(TEST_DESTINATION_PATH + "/file2", true);
+ result =
+ routerProtocol.listOpenFiles(0,
EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
+ testPath);
+ // Should list one file only
+ assertEquals(1, result.size());
+ assertEquals(routerClient.getFileInfo(TEST_DESTINATION_PATH +
"/file2").getFileId(),
+ result.get(0).getId());
+ ite = routerClient.listOpenFiles(testPath);
+ routerClient.open(testPath + "/file2");
+ while (ite.hasNext()) {
+ OpenFileEntry ofe = ite.next();
+ assertTrue(ofe.getFilePath().equals(testPath + "/file2"));
+ }
+ os0.close();
+ os1.close();
+ }
+
+ @Test
+ public void testMultipleDestinationsMultipleBatches() throws Exception {
+ String testPath = "/" + getMethodName();
+ createMountTableEntry(testPath, cluster.getNameservices());
+
+ // Make ns1 have a much bigger inodeid than ns0
+ cluster.getNamenode("ns0",
null).getNamenode().getNamesystem().getFSDirectory()
+ .resetLastInodeIdWithoutChecking((long) 1E6);
+ cluster.getNamenode("ns1",
null).getNamenode().getNamesystem().getFSDirectory()
+ .resetLastInodeIdWithoutChecking((long) 2E6);
+ runBatchListOpenFilesTest(testPath);
+
+ // Rerun the test with ns0 having a much bigger inodeid than ns1
+ cluster.getNamenode("ns0",
null).getNamenode().getNamesystem().getFSDirectory()
+ .resetLastInodeIdWithoutChecking((long) 4E6);
+ cluster.getNamenode("ns1",
null).getNamenode().getNamesystem().getFSDirectory()
+ .resetLastInodeIdWithoutChecking((long) 3E6);
+ runBatchListOpenFilesTest(testPath);
+ }
+
+ private static void runBatchListOpenFilesTest(String testPath) throws
IOException {
+ // Open 3 batches on both namespaces
+ OutputStream[] oss0 = new OutputStream[3 * BATCH_SIZE];
+ OutputStream[] oss1 = new OutputStream[3 * BATCH_SIZE];
+ for (int i = 0; i < 3 * BATCH_SIZE; i++) {
+ oss0[i] = client0.create(TEST_DESTINATION_PATH + "/file0a_" + i, true);
+ oss1[i] = client1.create(TEST_DESTINATION_PATH + "/file1a_" + i, true);
+ }
+ RemoteIterator<OpenFileEntry> ite = routerClient.listOpenFiles(testPath);
+ List<OpenFileEntry> allEntries = new ArrayList<>();
+ while (ite.hasNext()) {
+ allEntries.add(ite.next());
+ }
+ // All files should be reported once
+ assertEquals(3 * 2 * BATCH_SIZE, allEntries.size());
+
+ // Clean up
+ for (int i = 0; i < 3 * BATCH_SIZE; i++) {
+ oss0[i].close();
+ oss1[i].close();
+ }
+ client0.delete(TEST_DESTINATION_PATH, true);
+ client1.delete(TEST_DESTINATION_PATH, true);
+ }
+
+ /**
+ * Creates a mount with custom source path and some fixed destination paths.
+ */
+ private static void createMountTableEntry(String sourcePath, List<String>
nsIds)
+ throws Exception {
+ Map<String, String> destMap = new HashMap<>();
+ for (String nsId : nsIds) {
+ destMap.put(nsId, TEST_DESTINATION_PATH);
+ }
+ MountTable newEntry = MountTable.newInstance(sourcePath, destMap);
+ if (nsIds.size() > 1) {
+ newEntry.setDestOrder(DestinationOrder.HASH_ALL);
+ }
+ AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(newEntry);
+ AddMountTableEntryResponse addResponse =
+ getAdminClient(routerContext.getRouter()).getMountTableManager()
+ .addMountTableEntry(addRequest);
+ assertTrue(addResponse.getStatus());
+ routerContext.getRouter().getStateStore().refreshCaches(true);
+ }
+}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index a6bd57b4963..a64f06916af 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -2073,8 +2073,12 @@ void resetLastInodeId(long newValue) throws IOException {
}
}
- /** Should only be used for tests to reset to any value */
- void resetLastInodeIdWithoutChecking(long newValue) {
+ /**
+ * Should only be used for tests to reset to any value.
+ * @param newValue new value to set to
+ */
+ @VisibleForTesting
+ public void resetLastInodeIdWithoutChecking(long newValue) {
inodeId.setCurrentValue(newValue);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]