ZanderXu commented on code in PR #7075:
URL: https://github.com/apache/hadoop/pull/7075#discussion_r1810591947
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java:
##########
@@ -1959,8 +1960,45 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long
prevId)
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
throws IOException {
- rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
- return null;
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(path,
false, false);
+ RemoteMethod method =
+ new RemoteMethod("listOpenFiles", new Class<?>[] {long.class,
EnumSet.class, String.class},
+ prevId, openFilesTypes, new RemoteParam());
+ Map<RemoteLocation, BatchedEntries> results =
+ rpcClient.invokeConcurrent(locations, method, true, false, -1,
BatchedEntries.class);
+
+ // Get the largest inodeIds for each namespace, and the smallest inodeId
of them
+ // then ignore all entries above this id to keep a consistent prevId for
the next listOpenFiles
+ long minOfMax = Long.MAX_VALUE;
+ for (BatchedEntries nsEntries : results.values()) {
Review Comment:
Is there a way to loop these `BatchedEntries` once to get the result?
It seems that the efficiency of two cycles is a bit low.
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java:
##########
@@ -1959,8 +1960,45 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long
prevId)
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
throws IOException {
- rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
- return null;
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(path,
false, false);
+ RemoteMethod method =
+ new RemoteMethod("listOpenFiles", new Class<?>[] {long.class,
EnumSet.class, String.class},
+ prevId, openFilesTypes, new RemoteParam());
+ Map<RemoteLocation, BatchedEntries> results =
+ rpcClient.invokeConcurrent(locations, method, true, false, -1,
BatchedEntries.class);
+
+ // Get the largest inodeIds for each namespace, and the smallest inodeId
of them
+ // then ignore all entries above this id to keep a consistent prevId for
the next listOpenFiles
+ long minOfMax = Long.MAX_VALUE;
+ for (BatchedEntries nsEntries : results.values()) {
+ // Only need to care about namespaces that still have more files to
report
+ if (!nsEntries.hasMore()) {
+ continue;
+ }
+ long max = 0;
+ for (int i = 0; i < nsEntries.size(); i++) {
+ max = Math.max(max, ((OpenFileEntry) nsEntries.get(i)).getId());
+ }
+ minOfMax = Math.min(minOfMax, max);
+ }
+ // Concatenate all entries into one result, sorted by inodeId
+ List<OpenFileEntry> routerEntries = new ArrayList<>();
+ boolean hasMore = false;
+ for (Map.Entry<RemoteLocation, BatchedEntries> entry : results.entrySet())
{
+ BatchedEntries nsEntries = entry.getValue();
+ hasMore |= nsEntries.hasMore();
+ for (int i = 0; i< nsEntries.size(); i++) {
Review Comment:
`for (int i = 0; i < nsEntries.size(); i++) {`
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java:
##########
@@ -1959,8 +1960,45 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long
prevId)
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path)
throws IOException {
- rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
- return null;
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(path,
false, false);
+ RemoteMethod method =
+ new RemoteMethod("listOpenFiles", new Class<?>[] {long.class,
EnumSet.class, String.class},
+ prevId, openFilesTypes, new RemoteParam());
+ Map<RemoteLocation, BatchedEntries> results =
+ rpcClient.invokeConcurrent(locations, method, true, false, -1,
BatchedEntries.class);
+
+ // Get the largest inodeIds for each namespace, and the smallest inodeId
of them
+ // then ignore all entries above this id to keep a consistent prevId for
the next listOpenFiles
+ long minOfMax = Long.MAX_VALUE;
+ for (BatchedEntries nsEntries : results.values()) {
+ // Only need to care about namespaces that still have more files to
report
+ if (!nsEntries.hasMore()) {
+ continue;
+ }
+ long max = 0;
+ for (int i = 0; i < nsEntries.size(); i++) {
+ max = Math.max(max, ((OpenFileEntry) nsEntries.get(i)).getId());
+ }
+ minOfMax = Math.min(minOfMax, max);
+ }
+ // Concatenate all entries into one result, sorted by inodeId
+ List<OpenFileEntry> routerEntries = new ArrayList<>();
+ boolean hasMore = false;
+ for (Map.Entry<RemoteLocation, BatchedEntries> entry : results.entrySet())
{
+ BatchedEntries nsEntries = entry.getValue();
+ hasMore |= nsEntries.hasMore();
+ for (int i = 0; i< nsEntries.size(); i++) {
+ OpenFileEntry ofe = (OpenFileEntry) nsEntries.get(i);
+ if (ofe.getId() > minOfMax) {
+ hasMore = true;
+ continue;
Review Comment:
`break`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]