Copilot commented on code in PR #8072:
URL: https://github.com/apache/hadoop/pull/8072#discussion_r2612818506


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java:
##########
@@ -1977,8 +1979,68 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long 
prevId)
   public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
       EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
           throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
-    return null;
+    rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+    List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, 
false, false);
+    RemoteMethod method =
+        new RemoteMethod("listOpenFiles", new Class<?>[] {long.class, 
EnumSet.class, String.class},
+            prevId, openFilesTypes, new RemoteParam());
+    Map<RemoteLocation, BatchedEntries> results =
+        rpcClient.invokeConcurrent(locations, method, true, false, -1, 
BatchedEntries.class);
+
+    // Get the largest inodeIds for each namespace, and the smallest inodeId 
of them
+    // then ignore all entries above this id to keep a consistent prevId for 
the next listOpenFiles
+    long minOfMax = Long.MAX_VALUE;
+    for (BatchedEntries nsEntries : results.values()) {
+      // Only need to care about namespaces that still have more files to 
report
+      if (!nsEntries.hasMore()) {
+        continue;
+      }
+      long max = 0;
+      for (int i = 0; i < nsEntries.size(); i++) {
+        max = Math.max(max, ((OpenFileEntry) nsEntries.get(i)).getId());
+      }
+      minOfMax = Math.min(minOfMax, max);
+    }
+    // Concatenate all entries into one result, sorted by inodeId
+    boolean hasMore = false;
+    Map<String, OpenFileEntry> routerEntries = new HashMap<>();
+    Map<String, RemoteLocation> resolvedPaths = new HashMap<>();
+    for (Map.Entry<RemoteLocation, BatchedEntries> entry : results.entrySet()) 
{
+      BatchedEntries nsEntries = entry.getValue();
+      hasMore |= nsEntries.hasMore();
+      for (int i = 0; i < nsEntries.size(); i++) {
+        OpenFileEntry ofe = (OpenFileEntry) nsEntries.get(i);
+        if (ofe.getId() > minOfMax) {
+          hasMore = true;
+          break;
+        }
+        RemoteLocation remoteLoc = entry.getKey();
+        String routerPath = 
ofe.getFilePath().replaceFirst(remoteLoc.getDest(), remoteLoc.getSrc());

Review Comment:
   The path replacement using `replaceFirst` could be unsafe if the destination 
path contains regex special characters. For example, if `remoteLoc.getDest()` 
contains characters like `.` or `*`, they will be interpreted as regex patterns 
rather than literal strings. Consider using a literal string replacement method 
or escaping the pattern with `Pattern.quote()`.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java:
##########
@@ -1977,8 +1979,68 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long 
prevId)
   public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
       EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
           throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
-    return null;
+    rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+    List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, 
false, false);
+    RemoteMethod method =
+        new RemoteMethod("listOpenFiles", new Class<?>[] {long.class, 
EnumSet.class, String.class},
+            prevId, openFilesTypes, new RemoteParam());
+    Map<RemoteLocation, BatchedEntries> results =
+        rpcClient.invokeConcurrent(locations, method, true, false, -1, 
BatchedEntries.class);
+
+    // Get the largest inodeIds for each namespace, and the smallest inodeId 
of them
+    // then ignore all entries above this id to keep a consistent prevId for 
the next listOpenFiles
+    long minOfMax = Long.MAX_VALUE;
+    for (BatchedEntries nsEntries : results.values()) {
+      // Only need to care about namespaces that still have more files to 
report
+      if (!nsEntries.hasMore()) {
+        continue;
+      }
+      long max = 0;
+      for (int i = 0; i < nsEntries.size(); i++) {
+        max = Math.max(max, ((OpenFileEntry) nsEntries.get(i)).getId());
+      }
+      minOfMax = Math.min(minOfMax, max);
+    }
+    // Concatenate all entries into one result, sorted by inodeId
+    boolean hasMore = false;
+    Map<String, OpenFileEntry> routerEntries = new HashMap<>();
+    Map<String, RemoteLocation> resolvedPaths = new HashMap<>();
+    for (Map.Entry<RemoteLocation, BatchedEntries> entry : results.entrySet()) 
{
+      BatchedEntries nsEntries = entry.getValue();
+      hasMore |= nsEntries.hasMore();
+      for (int i = 0; i < nsEntries.size(); i++) {
+        OpenFileEntry ofe = (OpenFileEntry) nsEntries.get(i);
+        if (ofe.getId() > minOfMax) {
+          hasMore = true;
+          break;
+        }
+        RemoteLocation remoteLoc = entry.getKey();
+        String routerPath = 
ofe.getFilePath().replaceFirst(remoteLoc.getDest(), remoteLoc.getSrc());
+        OpenFileEntry newEntry =
+            new OpenFileEntry(ofe.getId(), routerPath, ofe.getClientName(),
+                ofe.getClientMachine());
+        // An existing file already resolves to the same path.
+        // Resolve according to mount table and keep the best path.
+        if (resolvedPaths.containsKey(routerPath)) {
+          PathLocation pathLoc = 
subclusterResolver.getDestinationForPath(routerPath);
+          List<String> namespaces = pathLoc.getDestinations().stream().map(
+              RemoteLocation::getNameserviceId).collect(
+                  Collectors.toList());
+          int existingIdx = 
namespaces.indexOf(resolvedPaths.get(routerPath).getNameserviceId());
+          int currentIdx = namespaces.indexOf(remoteLoc.getNameserviceId());
+          if (currentIdx < existingIdx && currentIdx != -1) {
+            routerEntries.put(routerPath, newEntry);
+            resolvedPaths.put(routerPath, remoteLoc);
+          }

Review Comment:
   When duplicate file paths are detected (line 2024), the code resolves the 
path using `subclusterResolver.getDestinationForPath(routerPath)` to determine 
which namespace should take precedence. However, this resolver call is made for 
every duplicate, which could be expensive. Consider caching the PathLocation 
results or restructuring to avoid repeated lookups for the same path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to