This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7593 by this push:
     new 082d759448 HDDS-8830. Add admin CLI to list open files (#5920)
082d759448 is described below

commit 082d759448e3995b9330f8e914bb5bab171acaf2
Author: Siyao Meng <[email protected]>
AuthorDate: Thu Jan 18 19:24:12 2024 -0800

    HDDS-8830. Add admin CLI to list open files (#5920)
---
 hadoop-hdds/docs/content/tools/Admin.md            | 141 +++++++++++++-
 .../hadoop/ozone/client/MockOmTransport.java       |  11 ++
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   1 +
 .../ozone/om/helpers/ListOpenFilesResult.java      | 115 ++++++++++++
 .../hadoop/ozone/om/helpers/OpenKeySession.java    |   3 +
 .../ozone/om/protocol/OzoneManagerProtocol.java    |  12 ++
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  29 +++
 .../dist/src/main/compose/ozone/docker-config      |   2 +
 .../org/apache/hadoop/ozone/om/TestOmMetrics.java  |  12 +-
 .../hadoop/ozone/shell/TestOzoneShellHA.java       | 139 ++++++++++++++
 .../ozone/shell/TestOzoneShellHAWithFSO.java       |   2 +
 .../src/main/proto/OmClientProtocol.proto          |  24 +++
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |  48 ++++-
 .../org/apache/hadoop/ozone/audit/OMAction.java    |   4 +-
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |  12 +-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     | 116 +++++++++++-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   | 113 ++++++++++--
 .../protocolPB/OzoneManagerRequestHandler.java     |  34 ++++
 .../hadoop/ozone/om/TestOmMetadataManager.java     | 139 +++++++++++---
 .../ozone/admin/om/ListOpenFilesSubCommand.java    | 202 +++++++++++++++++++++
 .../org/apache/hadoop/ozone/admin/om/OMAdmin.java  |   1 +
 21 files changed, 1112 insertions(+), 48 deletions(-)

diff --git a/hadoop-hdds/docs/content/tools/Admin.md 
b/hadoop-hdds/docs/content/tools/Admin.md
index a05065e8eb..e89331230f 100644
--- a/hadoop-hdds/docs/content/tools/Admin.md
+++ b/hadoop-hdds/docs/content/tools/Admin.md
@@ -32,4 +32,143 @@ And quick overview about the available functionalities:
  * `ozone admin replicationmanager`: Can be used to check the status of the 
replications (and start / stop replication in case of emergency).
  * `ozone admin om`: Ozone Manager HA related tool to get information about 
the current cluster.
 
-For more detailed usage see the output of the `--help`.
+For more detailed usage see the output of `--help`.
+
+```bash
+$ ozone admin --help
+Usage: ozone admin [-hV] [--verbose] [-conf=<configurationPath>]
+                   [-D=<String=String>]... [COMMAND]
+Developer tools for Ozone Admin operations
+      -conf=<configurationPath>
+
+  -D, --set=<String=String>
+
+  -h, --help      Show this help message and exit.
+  -V, --version   Print version information and exit.
+      --verbose   More verbose output. Show the stack trace of the errors.
+Commands:
+  containerbalancer   ContainerBalancer specific operations
+  replicationmanager  ReplicationManager specific operations
+  safemode            Safe mode specific operations
+  printTopology       Print a tree of the network topology as reported by SCM
+  cert                Certificate related operations
+  container           Container specific operations
+  datanode            Datanode specific operations
+  pipeline            Pipeline specific operations
+  namespace           Namespace Summary specific admin operations
+  om                  Ozone Manager specific admin operations
+  reconfig            Dynamically reconfigure server without restarting it
+  scm                 Ozone Storage Container Manager specific admin operations
+```
+
+Some of those subcommand usages has been detailed in their dedicated feature 
documentation pages. For instance, [Decommissioning]({{<ref 
"feature/Decommission.md">}}), [Non-Rolling Upgrades and Downgrades]({{<ref 
"feature/Nonrolling-Upgrade.md">}}).
+
+
+## List open files
+
+List open files admin command lists open keys in Ozone Manager's 
`OpenKeyTable`.
+Works for all bucket types.
+Argument `--prefix` could be root (`/`), path to a bucket (`/vol1/buck`) or a 
key prefix (for FSO buckets the key prefix could contain parent object ID). But 
it can't be a volume.
+
+```bash
+$ ozone admin om lof --help
+Usage: ozone admin om list-open-files [-hV] [--json] [-l=<limit>]
+                                      [-p=<pathPrefix>] [-s=<startItem>]
+                                      [--service-host=<omHost>]
+                                      [--service-id=<omServiceId>]
+Lists open files (keys) in Ozone Manager.
+  -h, --help                Show this help message and exit.
+      --json                Format output as JSON
+  -l, --length=<limit>      Maximum number of items to list
+  -p, --prefix=<pathPrefix> Filter results by the specified path on the server
+                              side.
+  -s, --start=<startItem>   The item to start the listing from.
+                            i.e. continuation token. This will be excluded from
+                              the result.
+      --service-host=<omHost>
+                            Ozone Manager Host. If OM HA is enabled, use
+                              --service-id instead. If you must use
+                              --service-host with OM HA, this must point
+                              directly to the leader OM. This option is
+                              required when --service-id is not provided or
+                              when HA is not enabled.
+      --service-id, --om-service-id=<omServiceId>
+                            Ozone Manager Service ID
+  -V, --version             Print version information and exit.
+```
+
+### Example usages
+
+- In human-readable format, list open files (keys) under bucket 
`/volumelof/buck1` with a batch size of 3:
+
+```bash
+$ ozone admin om lof --service-id=om-service-test1 --length=3 
--prefix=/volumelof/buck1
+```
+
+```bash
+5 total open files (est.). Showing 3 open files (limit 3) under path prefix:
+  /volume-lof/buck1
+
+Client ID              Creation time   Hsync'ed        Open File Path
+111726338148007937     1704808626523   No              
/volume-lof/buck1/-9223372036854774527/key0
+111726338151415810     1704808626578   No              
/volume-lof/buck1/-9223372036854774527/key1
+111726338152071171     1704808626588   No              
/volume-lof/buck1/-9223372036854774527/key2
+
+To get the next batch of open keys, run:
+  ozone admin om lof -id=om-service-test1 --length=3 
--prefix=/volume-lof/buck1 
--start=/-9223372036854775552/-9223372036854775040/-9223372036854774527/key2/111726338152071171
+```
+
+- In JSON, list open files (keys) under bucket `/volumelof/buck1` with a batch 
size of 3:
+
+```bash
+$ ozone admin om lof --service-id=om-service-test1 --length=3 
--prefix=/volumelof/buck1 --json
+```
+
+```json
+{
+  "openKeys" : [ {
+    "keyInfo" : {
+      "metadata" : { },
+      "objectID" : -9223372036854774015,
+      "updateID" : 7,
+      "parentObjectID" : -9223372036854774527,
+      "volumeName" : "volume-lof",
+      "bucketName" : "buck1",
+      "keyName" : "key0",
+      "dataSize" : 4194304,
+      "keyLocationVersions" : [ ... ],
+      "creationTime" : 1704808722487,
+      "modificationTime" : 1704808722487,
+      "replicationConfig" : {
+        "replicationFactor" : "THREE",
+        "requiredNodes" : 3,
+        "replicationType" : "RATIS"
+      },
+      "fileName" : "key0",
+      "acls" : [ ... ],
+      "path" : "-9223372036854774527/key0",
+      "file" : true,
+      "replicatedSize" : 12582912,
+      "objectInfo" : "OMKeyInfo{volume='volume-lof', bucket='buck1', 
key='key0', dataSize='4194304', creationTime='1704808722487', 
objectID='-9223372036854774015', parentID='-9223372036854774527', 
replication='RATIS/THREE', fileChecksum='null}",
+      "hsync" : false,
+      "latestVersionLocations" : { ... },
+      "updateIDset" : true
+    },
+    "openVersion" : 0,
+    "clientId" : 111726344437039105
+  }, {
+    "keyInfo" : { ... },
+    "openVersion" : 0,
+    "clientId" : 111726344440578050
+  }, {
+    "keyInfo" : { ... },
+    "openVersion" : 0,
+    "clientId" : 111726344441233411
+  } ],
+  "totalOpenKeyCount" : 5,
+  "hasMore" : true,
+  "contToken" : 
"/-9223372036854775552/-9223372036854775040/-9223372036854774527/key2/111726344441233411"
+}
+```
+
+Note in JSON output mode, field `contToken` won't show up at all in the result 
if there are no more entries after the batch (i.e. when `hasMore` is `false`).
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
index e91a71a856..e4a8a80a63 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
@@ -45,6 +45,8 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoVol
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocationList;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListOpenFilesRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListOpenFilesResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LookupKeyRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LookupKeyResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
@@ -125,6 +127,10 @@ public class MockOmTransport implements OmTransport {
       return response(payload,
           r -> r.setServiceListResponse(
               serviceList(payload.getServiceListRequest())));
+    case ListOpenFiles:
+      return response(payload,
+          r -> r.setListOpenFilesResponse(
+              listOpenFiles(payload.getListOpenFilesRequest())));
     case AllocateBlock:
       return response(payload, r -> r.setAllocateBlockResponse(
           allocateBlock(payload.getAllocateBlockRequest())));
@@ -324,6 +330,11 @@ public class MockOmTransport implements OmTransport {
         .build();
   }
 
+  private ListOpenFilesResponse listOpenFiles(
+      ListOpenFilesRequest listOpenFilesRequest) {
+    return ListOpenFilesResponse.newBuilder().build();
+  }
+
   private OMResponse response(OMRequest payload,
       Function<OMResponse.Builder, OMResponse.Builder> function) {
     Builder builder = OMResponse.newBuilder();
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index f23a703bd0..c6e410bb45 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -244,6 +244,7 @@ public final class OmUtils {
     case ListKeysLight:
     case ListTrash:
     case ServiceList:
+    case ListOpenFiles:
     case ListMultiPartUploadParts:
     case GetFileStatus:
     case LookupFile:
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ListOpenFilesResult.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ListOpenFilesResult.java
new file mode 100644
index 0000000000..67ef5dfec2
--- /dev/null
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ListOpenFilesResult.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.helpers;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Encapsulates the result of listOpenFiles. It has a list of
+ * {@link OpenKeySession} and a boolean flag indicating if there
+ * are more entries that are not fetched after the current batch of result.
+ */
+public class ListOpenFilesResult {
+  /**
+   * Number of total open files globally.
+   */
+  @JsonProperty("totalOpenKeyCount")
+  private final long totalOpenKeyCount;
+  /**
+   * True if there are more entries after this batch under the given path.
+   */
+  @JsonProperty("hasMore")
+  private final boolean hasMore;
+  /**
+   * True if there are more entries after this batch under the given path.
+   */
+  @JsonProperty("contToken")
+  private final String continuationToken;
+  /**
+   * List of open files. Each has client ID and OmKeyInfo.
+   */
+  private final List<OpenKeySession> openKeySessionList;
+
+  public ListOpenFilesResult(long totalOpenKeyCount,
+                             boolean hasMore,
+                             String continuationToken,
+                             List<OpenKeySession> openKeySessionList) {
+    this.openKeySessionList = openKeySessionList;
+    this.hasMore = hasMore;
+    this.continuationToken = continuationToken;
+    this.totalOpenKeyCount = totalOpenKeyCount;
+  }
+
+  public ListOpenFilesResult(long totalOpenKeyCount,
+                             boolean hasMore,
+                             String continuationToken,
+                             List<Long> clientIDsList,
+                             List<KeyInfo> keyInfosList)
+      throws IOException {
+    this.openKeySessionList = getOpenKeySessionListFromPB(clientIDsList,
+        keyInfosList);
+    this.hasMore = hasMore;
+    this.continuationToken = continuationToken;
+    this.totalOpenKeyCount = totalOpenKeyCount;
+  }
+
+  /**
+   * Combines clientIDsList and keyInfosList into OpenKeySessionList for
+   * transfer to the client.
+   */
+  private List<OpenKeySession> getOpenKeySessionListFromPB(
+      List<Long> clientIDsList, List<KeyInfo> keyInfosList)
+      throws IOException {
+
+    Preconditions.checkArgument(clientIDsList.size() == keyInfosList.size(),
+        "clientIDsList size (" + clientIDsList.size() + ") should be " +
+            "the same as keyInfosList's (" + keyInfosList.size() + ")");
+
+    List<OpenKeySession> res = new ArrayList<>();
+
+    for (int i = 0; i < clientIDsList.size(); i++) {
+      OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfosList.get(i));
+      res.add(new OpenKeySession(clientIDsList.get(i),
+          omKeyInfo,
+          omKeyInfo.getLatestVersionLocations().getVersion()));
+    }
+    return res;
+  }
+
+  public long getTotalOpenKeyCount() {
+    return totalOpenKeyCount;
+  }
+
+  public boolean hasMore() {
+    return hasMore;
+  }
+
+  public String getContinuationToken() {
+    return continuationToken;
+  }
+
+  public List<OpenKeySession> getOpenKeys() {
+    return openKeySessionList;
+  }
+}
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OpenKeySession.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OpenKeySession.java
index 11ee622494..d3b32815d8 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OpenKeySession.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OpenKeySession.java
@@ -17,12 +17,15 @@
  */
 package org.apache.hadoop.ozone.om.helpers;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 /**
  * This class represents a open key "session". A session here means a key is
  * opened by a specific client, the client sends the handler to server, such
  * that servers can recognize this client, and thus know how to close the key.
  */
 public class OpenKeySession {
+  @JsonProperty("clientId")
   private final long id;
   private final OmKeyInfo keyInfo;
   // the version of the key when it is being opened in this session.
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 824346058d..f60fe686d0 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.DBUpdates;
 import org.apache.hadoop.ozone.om.helpers.DeleteTenantState;
 import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
+import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDeleteKeys;
@@ -422,6 +423,17 @@ public interface OzoneManagerProtocol
 
   ServiceInfoEx getServiceInfo() throws IOException;
 
+  /**
+   * List open files in OM.
+   * @param path One of: root "/", path to a bucket, key path, or key prefix
+   * @param maxKeys Limit the number of keys that can be returned in this 
batch.
+   * @param contToken Continuation token.
+   * @return ListOpenFilesResult
+   * @throws IOException
+   */
+  ListOpenFilesResult listOpenFiles(String path, int maxKeys, String contToken)
+      throws IOException;
+
   /**
    * Transfer the raft leadership.
    *
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 84c33db925..57a9d23ee9 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.om.helpers.DBUpdates;
 import org.apache.hadoop.ozone.om.helpers.DeleteTenantState;
 import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
 import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
+import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDeleteKeys;
@@ -134,6 +135,8 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListKey
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListKeysLightResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListMultipartUploadsRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListMultipartUploadsResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListOpenFilesRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListOpenFilesResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusLightResponse;
@@ -1795,6 +1798,32 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
         resp.getCaCertificate(), resp.getCaCertsList());
   }
 
+  @Override
+  public ListOpenFilesResult listOpenFiles(String path,
+                                           int maxKeys,
+                                           String contToken)
+      throws IOException {
+    ListOpenFilesRequest req = ListOpenFilesRequest.newBuilder()
+        .setPath(path)
+        .setCount(maxKeys)
+        .setToken(contToken)
+        .build();
+
+    OMRequest omRequest = createOMRequest(Type.ListOpenFiles)
+        .setListOpenFilesRequest(req)
+        .build();
+
+    final ListOpenFilesResponse resp = handleError(submitRequest(omRequest))
+        .getListOpenFilesResponse();
+
+    return new ListOpenFilesResult(
+        resp.getTotalOpenKeyCount(),
+        resp.getHasMore(),
+        resp.hasContinuationToken() ? resp.getContinuationToken() : null,
+        resp.getClientIDList(),
+        resp.getKeyInfoList());
+  }
+
   @Override
   public void transferLeadership(String newLeaderId)
       throws IOException {
diff --git a/hadoop-ozone/dist/src/main/compose/ozone/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozone/docker-config
index 729f036e15..27208cf654 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone/docker-config
@@ -54,6 +54,8 @@ OZONE-SITE.XML_hdds.scm.wait.time.after.safemode.exit=30s
 
 OZONE-SITE.XML_dfs.container.ratis.datastream.enabled=true
 
+OZONE-SITE.XML_ozone.fs.hsync.enabled=true
+
 OZONE_CONF_DIR=/etc/hadoop
 OZONE_LOG_DIR=/var/log/hadoop
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
index 1a87b45079..0f8c82a558 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
@@ -305,7 +305,7 @@ public class TestOmMetrics {
     doKeyOps(keyArgs);
 
     MetricsRecordBuilder omMetrics = getMetrics("OMMetrics");
-    assertCounter("NumKeyOps", 7L, omMetrics);
+    assertCounter("NumKeyOps", 8L, omMetrics);
     assertCounter("NumKeyAllocate", 1L, omMetrics);
     assertCounter("NumKeyLookup", 1L, omMetrics);
     assertCounter("NumKeyDeletes", 1L, omMetrics);
@@ -313,12 +313,13 @@ public class TestOmMetrics {
     assertCounter("NumTrashKeyLists", 1L, omMetrics);
     assertCounter("NumKeys", 0L, omMetrics);
     assertCounter("NumInitiateMultipartUploads", 1L, omMetrics);
+    assertCounter("NumListOpenFiles", 1L, omMetrics);
 
     keyArgs = createKeyArgs(volumeName, bucketName,
         new ECReplicationConfig("rs-3-2-1024K"));
     doKeyOps(keyArgs);
     omMetrics = getMetrics("OMMetrics");
-    assertCounter("NumKeyOps", 14L, omMetrics);
+    assertCounter("NumKeyOps", 16L, omMetrics);
     assertCounter("EcKeyCreateTotal", 1L, omMetrics);
 
     keyArgs = createKeyArgs(volumeName, bucketName,
@@ -370,7 +371,7 @@ public class TestOmMetrics {
     doKeyOps(keyArgs);
 
     omMetrics = getMetrics("OMMetrics");
-    assertCounter("NumKeyOps", 28L, omMetrics);
+    assertCounter("NumKeyOps", 31L, omMetrics);
     assertCounter("NumKeyAllocate", 6L, omMetrics);
     assertCounter("NumKeyLookup", 3L, omMetrics);
     assertCounter("NumKeyDeletes", 4L, omMetrics);
@@ -813,6 +814,11 @@ public class TestOmMetrics {
       writeClient.initiateMultipartUpload(keyArgs);
     } catch (IOException ignored) {
     }
+
+    try {
+      writeClient.listOpenFiles("", 100, "");
+    } catch (IOException ignored) {
+    }
   }
 
   private OmKeyArgs createKeyArgs(String volumeName, String bucketName,
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java
index 92381829f0..628a101ea9 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.UUID;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
@@ -71,6 +72,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
 import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
 import static org.apache.hadoop.fs.FileSystem.TRASH_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
@@ -80,6 +83,7 @@ import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLU
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -89,7 +93,10 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
+import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
 import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,6 +113,7 @@ import picocli.CommandLine.RunLast;
  * Inspired by TestS3Shell
  */
 @Timeout(300)
+@TestMethodOrder(OrderAnnotation.class)
 public class TestOzoneShellHA {
 
   private static final Logger LOG =
@@ -140,6 +148,7 @@ public class TestOzoneShellHA {
   @BeforeAll
   public static void init() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setBoolean(OZONE_FS_HSYNC_ENABLED, true);
     startCluster(conf);
   }
 
@@ -549,6 +558,134 @@ public class TestOzoneShellHA {
     execute(ozoneAdminShell, args);
   }
 
+  @Test
+  public void testAdminCmdListOpenFiles()
+      throws IOException, InterruptedException {
+
+    OzoneConfiguration conf = cluster.getConf();
+    final String hostPrefix = OZONE_OFS_URI_SCHEME + "://" + omServiceId;
+
+    OzoneConfiguration clientConf = getClientConfForOFS(hostPrefix, conf);
+    clientConf.setBoolean(OZONE_FS_HSYNC_ENABLED, true);
+    FileSystem fs = FileSystem.get(clientConf);
+
+    assertNotEquals(fs.getConf().get(OZONE_FS_HSYNC_ENABLED),
+        "false", OZONE_FS_HSYNC_ENABLED + " is set to false " +
+            "by external force. Must be true to allow hsync to function");
+
+    final String volumeName = "volume-lof";
+    final String bucketName = "buck1";
+
+    String dir1 = hostPrefix +
+        OM_KEY_PREFIX + volumeName +
+        OM_KEY_PREFIX + bucketName +
+        OM_KEY_PREFIX + "dir1";
+    // Create volume, bucket, dir
+    assertTrue(fs.mkdirs(new Path(dir1)));
+    String keyPrefix = OM_KEY_PREFIX + "key";
+
+    final int numKeys = 5;
+    String[] keys = new String[numKeys];
+
+    for (int i = 0; i < numKeys; i++) {
+      keys[i] = dir1 + keyPrefix + i;
+    }
+
+    int pageSize = 3;
+    String pathToBucket = "/" +  volumeName + "/" + bucketName;
+    FSDataOutputStream[] streams = new FSDataOutputStream[numKeys];
+
+    try {
+      // Create multiple keys and hold them open
+      for (int i = 0; i < numKeys; i++) {
+        streams[i] = fs.create(new Path(keys[i]));
+        streams[i].write(1);
+      }
+
+      // Wait for DB flush
+      cluster.getOzoneManager().awaitDoubleBufferFlush();
+
+      String[] args = new String[] {"om", "lof",
+          "--service-id", omServiceId,
+          "-l", String.valueOf(numKeys + 1),  // pagination
+          "-p", pathToBucket};
+      // Run listopenfiles
+      execute(ozoneAdminShell, args);
+      String cmdRes = getStdOut();
+      // Should have retrieved all 5 open keys
+      for (int i = 0; i < numKeys; i++) {
+        assertTrue(cmdRes.contains(keyPrefix + i));
+      }
+
+      // Try pagination
+      args = new String[] {"om", "lof",
+          "--service-id", omServiceId,
+          "-l", String.valueOf(pageSize),  // pagination
+          "-p", pathToBucket};
+      execute(ozoneAdminShell, args);
+      cmdRes = getStdOut();
+
+      // Should have retrieved the 1st page only (3 keys)
+      for (int i = 0; i < pageSize; i++) {
+        assertTrue(cmdRes.contains(keyPrefix + i));
+      }
+      for (int i = pageSize; i < numKeys; i++) {
+        assertFalse(cmdRes.contains(keyPrefix + i));
+      }
+      // No hsync'ed file/key at this point
+      assertFalse(cmdRes.contains("\tYes\t"));
+
+      // Get last line of the output which has the continuation token
+      String[] lines = cmdRes.split("\n");
+      String nextCmd = lines[lines.length - 1].trim();
+      String kw = "--start=";
+      String contToken =
+          nextCmd.substring(nextCmd.lastIndexOf(kw) + kw.length());
+
+      args = new String[] {"om", "lof",
+          "--service-id", omServiceId,
+          "-l", String.valueOf(pageSize),  // pagination
+          "-p", pathToBucket,
+          "-s", contToken};
+      execute(ozoneAdminShell, args);
+      cmdRes = getStdOut();
+
+      // Should have retrieved the 2nd page only (2 keys)
+      for (int i = 0; i < pageSize - 1; i++) {
+        assertFalse(cmdRes.contains(keyPrefix + i));
+      }
+      // Note: key2 is shown in the continuation token prompt
+      for (int i = pageSize - 1; i < numKeys; i++) {
+        assertTrue(cmdRes.contains(keyPrefix + i));
+      }
+
+      // hsync last key
+      streams[numKeys - 1].hsync();
+      // Wait for flush
+      cluster.getOzoneManager().awaitDoubleBufferFlush();
+
+      execute(ozoneAdminShell, args);
+      cmdRes = getStdOut();
+
+      // Verify that only one key is hsync'ed
+      assertTrue(cmdRes.contains("\tYes\t"), "One key should be hsync'ed");
+      assertTrue(cmdRes.contains("\tNo\t"), "One key should not be hsync'ed");
+    } finally {
+      // Cleanup
+      IOUtils.closeQuietly(streams);
+    }
+
+  }
+
+  /**
+   * Return stdout as a String, then clears existing output.
+   */
+  private String getStdOut() throws UnsupportedEncodingException {
+    String res = out.toString(UTF_8.name());
+    out.reset();
+    return res;
+  }
+
   /**
    * Helper function to retrieve Ozone client configuration for trash testing.
    * @param hostPrefix Scheme + Authority. e.g. ofs://om-service-test1
@@ -1451,6 +1588,8 @@ public class TestOzoneShellHA {
   }
 
   @Test
+  // Run this UT last. This interferes with testAdminCmdListOpenFiles
+  @Order(Integer.MAX_VALUE)
   public void testRecursiveBucketDelete()
       throws Exception {
     String volume1 = "volume50";
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHAWithFSO.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHAWithFSO.java
index 0c6a5e8143..485cb2c919 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHAWithFSO.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHAWithFSO.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.shell;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -37,6 +38,7 @@ public class TestOzoneShellHAWithFSO extends TestOzoneShellHA 
{
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT,
         OMConfigKeys.OZONE_BUCKET_LAYOUT_FILE_SYSTEM_OPTIMIZED);
+    conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
     startCluster(conf);
   }
 
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index cf5630ecee..1dbe0853a1 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -146,6 +146,8 @@ enum Type {
   SetSnapshotProperty = 128;
   ListStatusLight = 129;
   GetSnapshotInfo = 130;
+
+  ListOpenFiles = 132;
 }
 
 enum SafeMode {
@@ -281,6 +283,8 @@ message OMRequest {
   optional MultipartUploadsExpiredAbortRequest 
multipartUploadsExpiredAbortRequest = 126;
   optional SetSnapshotPropertyRequest       SetSnapshotPropertyRequest     = 
127;
   optional SnapshotInfoRequest              SnapshotInfoRequest            = 
128;
+
+  optional ListOpenFilesRequest             ListOpenFilesRequest           = 
132;
 }
 
 message OMResponse {
@@ -403,6 +407,8 @@ message OMResponse {
   optional ListStatusLightResponse           listStatusLightResponse       = 
129;
   optional SnapshotInfoResponse              SnapshotInfoResponse          = 
130;
   optional OMLockDetailsProto                omLockDetails                 = 
131;
+
+  optional ListOpenFilesResponse             ListOpenFilesResponse         = 
132;
 }
 
 enum Status {
@@ -1529,6 +1535,24 @@ message CancelPrepareResponse {
 
 }
 
+message ListOpenFilesRequest {
+  optional string path = 1;
+  optional uint32 count = 2;
+  optional string token = 3;
+}
+
+message ListOpenFilesResponse {
+  // size of openKeyTable and openFileTable combined
+  optional uint64 totalOpenKeyCount = 1;
+  // indicates if there are more entries to be retrieved under the given path
+  optional bool hasMore = 2;
+  // continuation token should match a dbKey in openKeyTable or openFileTable
+  optional string continuationToken = 3;
+  // result
+  repeated uint64 clientID = 4;
+  repeated KeyInfo keyInfo = 5;
+}
+
 message ServicePort {
     enum Type {
         RPC = 1;
diff --git 
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
 
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index 68c5cf758e..00bf575205 100644
--- 
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ 
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.helpers.ListKeysResult;
+import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDBUserPrincipalInfo;
@@ -55,6 +56,8 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.ozone.compaction.log.CompactionLogEntry;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
 /**
  * OM metadata manager interface.
  */
@@ -122,9 +125,17 @@ public interface OMMetadataManager extends 
DBStoreHAManager {
    * @param key    - key name
    * @return DB key as String.
    */
-
   String getOzoneKey(String volume, String bucket, String key);
 
+  /**
+   * Get DB key for a key or prefix in an FSO bucket given existing
+   * volume and bucket names.
+   */
+  String getOzoneKeyFSO(String volumeName,
+                        String bucketName,
+                        String keyPrefix)
+      throws IOException;
+
   /**
    * Given a volume, bucket and a key, return the corresponding DB directory
    * key.
@@ -149,6 +160,17 @@ public interface OMMetadataManager extends 
DBStoreHAManager {
    */
   String getOpenKey(String volume, String bucket, String key, long id);
 
+  /**
+   * Returns client ID in Long of an OpenKeyTable DB Key String.
+   * @param dbOpenKeyName An OpenKeyTable DB Key String.
+   * @return Client ID (Long)
+   */
+  static long getClientIDFromOpenKeyDBKey(String dbOpenKeyName) {
+    final int lastPrefix = dbOpenKeyName.lastIndexOf(OM_KEY_PREFIX);
+    final String clientIdString = dbOpenKeyName.substring(lastPrefix + 1);
+    return Long.parseLong(clientIdString);
+  }
+
   /**
    * Given a volume, check if it is empty, i.e there are no buckets inside it.
    *
@@ -188,6 +210,24 @@ public interface OMMetadataManager extends 
DBStoreHAManager {
                                  boolean hasSnapshot)
       throws IOException;
 
+  /**
+   * Inner implementation of listOpenFiles. Called after all the arguments are
+   * checked and processed by Ozone Manager.
+   * @param bucketLayout
+   * @param maxKeys
+   * @param dbOpenKeyPrefix
+   * @param hasContToken
+   * @param dbContTokenPrefix
+   * @return ListOpenFilesResult
+   * @throws IOException
+   */
+  ListOpenFilesResult listOpenFiles(BucketLayout bucketLayout,
+                                    int maxKeys,
+                                    String dbOpenKeyPrefix,
+                                    boolean hasContToken,
+                                    String dbContTokenPrefix)
+      throws IOException;
+
   /**
    * Returns a list of keys represented by {@link OmKeyInfo} in the given
    * bucket.
@@ -278,6 +318,12 @@ public interface OMMetadataManager extends 
DBStoreHAManager {
   List<OmVolumeArgs> listVolumes(String userName, String prefix,
       String startKey, int maxKeys) throws IOException;
 
+  /**
+   * Get total open key count (estimated, due to the nature of RocksDB impl)
+   * of both OpenKeyTable and OpenFileTable.
+   */
+  long getTotalOpenKeyCount() throws IOException;
+
   /**
    * Returns the names of up to {@code count} open keys whose age is
    * greater than or equal to {@code expireThreshold}.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
index 4e9039252f..e641d13270 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
@@ -102,7 +102,9 @@ public enum OMAction implements AuditAction {
   SNAPSHOT_INFO,
   SET_TIMES,
 
-  ABORT_EXPIRED_MULTIPART_UPLOAD;
+  ABORT_EXPIRED_MULTIPART_UPLOAD,
+
+  LIST_OPEN_FILES;
 
   @Override
   public String getAction() {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index ed5efbefe5..86fa867060 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -80,6 +80,7 @@ public class OMMetrics implements OmMetadataReaderMetrics {
   private @Metric MutableCounterLong numCreateFile;
   private @Metric MutableCounterLong numLookupFile;
   private @Metric MutableCounterLong numListStatus;
+  private @Metric MutableCounterLong numListOpenFiles;
 
   private @Metric MutableCounterLong numOpenKeyDeleteRequests;
   private @Metric MutableCounterLong numOpenKeysSubmittedForDeletion;
@@ -176,6 +177,7 @@ public class OMMetrics implements OmMetadataReaderMetrics {
   private @Metric MutableCounterLong numCreateFileFails;
   private @Metric MutableCounterLong numLookupFileFails;
   private @Metric MutableCounterLong numListStatusFails;
+  private @Metric MutableCounterLong numListOpenFilesFails;
   private @Metric MutableCounterLong getNumGetKeyInfoFails;
 
   private @Metric MutableCounterLong numRecoverLeaseFails;
@@ -428,10 +430,18 @@ public class OMMetrics implements OmMetadataReaderMetrics 
{
   }
 
   public void incNumListS3BucketsFails() {
-    numBucketOps.incr();
     numBucketS3ListFails.incr();
   }
 
+  public void incNumListOpenFiles() {
+    numKeyOps.incr();
+    numListOpenFiles.incr();
+  }
+
+  public void incNumListOpenFilesFails() {
+    numListOpenFilesFails.incr();
+  }
+
   public void incNumInitiateMultipartUploads() {
     numKeyOps.incr();
     numInitiateMultipartUploads.incr();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index e272cb8692..1969bce918 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.ozone.om.codec.TokenIdentifierCodec;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.helpers.ListKeysResult;
+import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDBUserPrincipalInfo;
@@ -75,6 +76,7 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload;
 import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@@ -102,6 +104,7 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang3.StringUtils;
 import static org.apache.hadoop.ozone.OzoneConsts.DB_TRANSIENT_MARKER;
+import static org.apache.hadoop.ozone.OzoneConsts.HSYNC_CLIENT_ID;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_LIMIT;
@@ -273,14 +276,14 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
   private Table bucketTable;
   private Table<String, OmKeyInfo> keyTable;
   private Table deletedTable;
-  private Table openKeyTable;
+  private Table<String, OmKeyInfo> openKeyTable;
   private Table<String, OmMultipartKeyInfo> multipartInfoTable;
   private Table<String, S3SecretValue> s3SecretTable;
   private Table dTokenTable;
   private Table prefixTable;
   private Table<String, OmDirectoryInfo> dirTable;
   private Table<String, OmKeyInfo> fileTable;
-  private Table openFileTable;
+  private Table<String, OmKeyInfo> openFileTable;
   private Table transactionInfoTable;
   private Table metaTable;
 
@@ -475,7 +478,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
   }
 
   @Override
-  public Table getOpenKeyTable(BucketLayout bucketLayout) {
+  public Table<String, OmKeyInfo> getOpenKeyTable(BucketLayout bucketLayout) {
     if (bucketLayout.isFileSystemOptimized()) {
       return openFileTable;
     }
@@ -842,6 +845,18 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
     return builder.toString();
   }
 
+  @Override
+  public String getOzoneKeyFSO(String volumeName,
+                               String bucketName,
+                               String keyPrefix)
+      throws IOException {
+    final long volumeId = getVolumeId(volumeName);
+    final long bucketId = getBucketId(volumeName, bucketName);
+    // FSO keyPrefix could look like: -9223372036854774527/key1
+    return getOzoneKey(Long.toString(volumeId),
+        Long.toString(bucketId), keyPrefix);
+  }
+
   @Override
   public String getOzoneDirKey(String volume, String bucket, String key) {
     key = OzoneFSUtils.addTrailingSlashIfNeeded(key);
@@ -1175,6 +1190,94 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
     return keyTable.iterator();
   }
 
+  @Override
+  public ListOpenFilesResult listOpenFiles(BucketLayout bucketLayout,
+                                           int maxKeys,
+                                           String dbOpenKeyPrefix,
+                                           boolean hasContToken,
+                                           String dbContTokenPrefix)
+      throws IOException {
+
+    List<OpenKeySession> openKeySessionList = new ArrayList<>();
+    int currentCount = 0;
+    final boolean hasMore;
+    final String retContToken;
+
+    // TODO: If we want "better" results, we want to iterate cache like
+    //  listKeys do. But that complicates the iteration logic by quite a bit.
+    //  And if we do that, we need to refactor listKeys as well to dedup.
+
+    final Table<String, OmKeyInfo> okTable, kTable;
+    okTable = getOpenKeyTable(bucketLayout);
+    // keyTable required to check key hsync metadata. TODO: HDDS-10077
+    kTable = getKeyTable(bucketLayout);
+
+    // No lock required since table iterator creates a "snapshot"
+    try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
+             openKeyIter = okTable.iterator()) {
+      KeyValue<String, OmKeyInfo> kv;
+      kv = openKeyIter.seek(dbContTokenPrefix);
+      if (hasContToken && kv.getKey().equals(dbContTokenPrefix)) {
+        // Skip one entry when cont token is specified and the current entry
+        // key is exactly the same as cont token.
+        openKeyIter.next();
+      }
+      while (currentCount < maxKeys && openKeyIter.hasNext()) {
+        kv = openKeyIter.next();
+        if (kv != null && kv.getKey().startsWith(dbOpenKeyPrefix)) {
+          String dbKey = kv.getKey();
+          long clientID = OMMetadataManager.getClientIDFromOpenKeyDBKey(dbKey);
+          OmKeyInfo omKeyInfo = kv.getValue();
+
+          // Trim client ID to get the keyTable dbKey
+          int lastSlashIdx = dbKey.lastIndexOf(OM_KEY_PREFIX);
+          String ktDbKey = dbKey.substring(0, lastSlashIdx);
+          // Check whether the key has been hsync'ed by checking keyTable
+          checkAndUpdateKeyHsyncStatus(omKeyInfo, ktDbKey, kTable);
+
+          openKeySessionList.add(
+              new OpenKeySession(clientID, omKeyInfo,
+                  omKeyInfo.getLatestVersionLocations().getVersion()));
+          currentCount++;
+        }
+      }
+
+      // Set hasMore flag as a hint for client-side pagination
+      if (openKeyIter.hasNext()) {
+        KeyValue<String, OmKeyInfo> nextKv = openKeyIter.next();
+        hasMore = nextKv != null && 
nextKv.getKey().startsWith(dbOpenKeyPrefix);
+      } else {
+        hasMore = false;
+      }
+
+      // Set continuation token
+      retContToken = hasMore ? kv.getKey() : null;
+    }
+
+    return new ListOpenFilesResult(
+        getTotalOpenKeyCount(),
+        hasMore,
+        retContToken,
+        openKeySessionList);
+  }
+
+  /**
+   * Check and update OmKeyInfo from OpenKeyTable with hsync status in 
KeyTable.
+   */
+  private void checkAndUpdateKeyHsyncStatus(OmKeyInfo omKeyInfo,
+                                            String dbKey,
+                                            Table<String, OmKeyInfo> kTable)
+      throws IOException {
+    OmKeyInfo ktOmKeyInfo = kTable.get(dbKey);
+    if (ktOmKeyInfo != null) {
+      // The same key in OpenKeyTable also exists in KeyTable, indicating
+      // the key has been hsync'ed
+      String hsyncClientId = ktOmKeyInfo.getMetadata().get(HSYNC_CLIENT_ID);
+      // Append HSYNC_CLIENT_ID to OmKeyInfo to be returned to the client
+      omKeyInfo.getMetadata().put(HSYNC_CLIENT_ID, hsyncClientId);
+    }
+  }
+
   @Override
   public ListKeysResult listKeys(String volumeName, String bucketName,
                                  String startKey, String keyPrefix, int 
maxKeys)
@@ -1732,6 +1835,13 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
     return getMultipartInfoTable().isExist(multipartInfoDbKey);
   }
 
+  @Override
+  public long getTotalOpenKeyCount() throws IOException {
+    // Get an estimated key count of OpenKeyTable + OpenFileTable
+    return openKeyTable.getEstimatedKeyCount()
+        + openFileTable.getEstimatedKeyCount();
+  }
+
   @Override
   public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
       int count, BucketLayout bucketLayout) throws IOException {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 142b211b73..fe2b2e7332 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -43,6 +43,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.StringTokenizer;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
@@ -88,6 +89,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.OzoneManagerVersion;
+import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
 import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob;
 import org.apache.hadoop.ozone.om.lock.OMLockDetails;
 import org.apache.hadoop.ozone.om.ratis_snapshot.OmRatisSnapshotProvider;
@@ -260,6 +262,7 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAU
 import static org.apache.hadoop.ozone.OzoneConsts.DB_TRANSIENT_MARKER;
 import static org.apache.hadoop.ozone.OzoneConsts.DEFAULT_OM_UPDATE_ID;
 import static org.apache.hadoop.ozone.OzoneConsts.LAYOUT_VERSION_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_FILE;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_TEMP_FILE;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
@@ -298,7 +301,9 @@ import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DETE
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FEATURE_NOT_ENABLED;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INTERNAL_ERROR;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_PATH;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PERMISSION_DENIED;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
 import static 
org.apache.hadoop.ozone.om.s3.S3SecretStoreConfigurationKeys.DEFAULT_SECRET_STORAGE_TYPE;
@@ -3187,14 +3192,100 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   }
 
   @Override
-  public void transferLeadership(String newLeaderId)
+  public ListOpenFilesResult listOpenFiles(String path,
+                                           int maxKeys,
+                                           String contToken)
       throws IOException {
-    final UserGroupInformation ugi = getRemoteUser();
-    if (!isAdmin(ugi)) {
-      throw new OMException(
-          "Only Ozone admins are allowed to transfer raft leadership.",
-          PERMISSION_DENIED);
+
+    metrics.incNumListOpenFiles();
+    checkAdminUserPrivilege("list open files.");
+
+    // Using final to make sure they are assigned once and only once in
+    // every branch.
+    final String dbOpenKeyPrefix, dbContTokenPrefix;
+    final String volumeName, bucketName;
+    final BucketLayout bucketLayout;
+
+    // Process path prefix
+    if (path == null || path.isEmpty() || path.equals(OM_KEY_PREFIX)) {
+      // path is root
+      dbOpenKeyPrefix = "";
+      volumeName = "";
+      bucketName = "";
+      // default to FSO's OpenFileTable. TODO: client option to pass 
OBS/LEGACY?
+      bucketLayout = BucketLayout.FILE_SYSTEM_OPTIMIZED;
+    } else {
+      // path is bucket or key prefix, break it down to volume, bucket, prefix
+      StringTokenizer tokenizer = new StringTokenizer(path, OM_KEY_PREFIX);
+      // Validate path to avoid NoSuchElementException
+      if (tokenizer.countTokens() < 2) {
+        metrics.incNumListOpenFilesFails();
+        throw new OMException("Invalid path: " + path + ". " +
+            "Only root level or bucket level path is supported at this time",
+            INVALID_PATH);
+      }
+
+      volumeName = tokenizer.nextToken();
+      bucketName = tokenizer.nextToken();
+
+      OmBucketInfo bucketInfo;
+      try {
+        // as expected, getBucketInfo throws if volume or bucket does not exist
+        bucketInfo = getBucketInfo(volumeName, bucketName);
+      } catch (OMException ex) {
+        metrics.incNumListOpenFilesFails();
+        throw ex;
+      } catch (IOException ex) {
+        // Wrap IOException in OMException
+        metrics.incNumListOpenFilesFails();
+        throw new OMException(ex.getMessage(), NOT_SUPPORTED_OPERATION);
+      }
+
+      final String keyPrefix;
+      if (tokenizer.hasMoreTokens()) {
+        // Collect the rest but trim the leading "/"
+        keyPrefix = tokenizer.nextToken("").substring(1);
+      } else {
+        keyPrefix = "";
+      }
+
+      // Determine dbKey prefix based on the bucket type
+      bucketLayout = bucketInfo.getBucketLayout();
+      switch (bucketLayout) {
+      case FILE_SYSTEM_OPTIMIZED:
+        dbOpenKeyPrefix = metadataManager.getOzoneKeyFSO(
+            volumeName, bucketName, keyPrefix);
+        break;
+      case OBJECT_STORE:
+      case LEGACY:
+        dbOpenKeyPrefix = metadataManager.getOzoneKey(
+            volumeName, bucketName, keyPrefix);
+        break;
+      default:
+        metrics.incNumListOpenFilesFails();
+        throw new OMException("Unsupported bucket layout: " +
+            bucketInfo.getBucketLayout(), NOT_SUPPORTED_OPERATION);
+      }
+    }
+
+    // Process cont. token
+    if (contToken == null || contToken.isEmpty()) {
+      // if a continuation token is not specified
+      dbContTokenPrefix = dbOpenKeyPrefix;
+    } else {
+      dbContTokenPrefix = contToken;
     }
+
+    // arg processing done. call inner impl (table iteration)
+    return metadataManager.listOpenFiles(
+        bucketLayout, maxKeys, dbOpenKeyPrefix,
+        !StringUtils.isEmpty(contToken), dbContTokenPrefix);
+  }
+
+  @Override
+  public void transferLeadership(String newLeaderId)
+      throws IOException {
+    checkAdminUserPrivilege("transfer raft leadership.");
     if (!isRatisEnabled) {
       throw new IOException("OM HA not enabled.");
     }
@@ -3311,7 +3402,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     if (!isAdmin(ugi)) {
       final OMException omEx = new OMException(
           "Only Ozone admins are allowed to list tenants.", PERMISSION_DENIED);
-      AUDIT.logWriteFailure(buildAuditMessageForFailure(
+      AUDIT.logReadFailure(buildAuditMessageForFailure(
           OMAction.LIST_TENANT, new LinkedHashMap<>(), omEx));
       throw omEx;
     }
@@ -4781,13 +4872,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   public String printCompactionLogDag(String fileNamePrefix,
                                       String graphType)
       throws IOException {
-
-    final UserGroupInformation ugi = getRemoteUser();
-    if (!isAdmin(ugi)) {
-      throw new OMException(
-          "Only Ozone admins are allowed to print compaction DAG.",
-          PERMISSION_DENIED);
-    }
+    checkAdminUserPrivilege("print compaction DAG.");
 
     if (StringUtils.isBlank(fileNamePrefix)) {
       fileNamePrefix = "dag-";
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index d96637b461..8918cf070b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.ozone.om.helpers.ListKeysLightResult;
 import org.apache.hadoop.ozone.om.helpers.ListKeysResult;
 import org.apache.hadoop.ozone.om.helpers.DBUpdates;
 import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
+import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -57,6 +58,7 @@ import 
org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatusLight;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
@@ -80,6 +82,8 @@ import 
org.apache.hadoop.ozone.om.upgrade.DisallowedUntilLayoutVersion;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelSnapshotDiffRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelSnapshotDiffResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListOpenFilesRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListOpenFilesResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListSnapshotDiffJobRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListSnapshotDiffJobResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessRequest;
@@ -249,6 +253,11 @@ public class OzoneManagerRequestHandler implements 
RequestHandler {
             listMultipartUploads(request.getListMultipartUploadsRequest());
         responseBuilder.setListMultipartUploadsResponse(response);
         break;
+      case ListOpenFiles:
+        ListOpenFilesResponse listOpenFilesResponse = listOpenFiles(
+            request.getListOpenFilesRequest(), request.getVersion());
+        responseBuilder.setListOpenFilesResponse(listOpenFilesResponse);
+        break;
       case ServiceList:
         ServiceListResponse serviceListResponse = getServiceList(
             request.getServiceListRequest());
@@ -921,6 +930,31 @@ public class OzoneManagerRequestHandler implements 
RequestHandler {
     return resp;
   }
 
+  private ListOpenFilesResponse listOpenFiles(ListOpenFilesRequest req,
+                                              int clientVersion)
+      throws IOException {
+    ListOpenFilesResponse.Builder resp = ListOpenFilesResponse.newBuilder();
+
+    ListOpenFilesResult res =
+        impl.listOpenFiles(req.getPath(), req.getCount(), req.getToken());
+    // TODO: Is there a clean way to avoid ser-de for responses:
+    //  OM does: ListOpenFilesResult -> ListOpenFilesResponse
+    //  Client : ListOpenFilesResponse -> ListOpenFilesResult
+
+    resp.setTotalOpenKeyCount(res.getTotalOpenKeyCount());
+    resp.setHasMore(res.hasMore());
+    if (res.getContinuationToken() != null) {
+      resp.setContinuationToken(res.getContinuationToken());
+    }
+
+    for (OpenKeySession e : res.getOpenKeys()) {
+      resp.addClientID(e.getId());
+      resp.addKeyInfo(e.getKeyInfo().getProtobuf(clientVersion));
+    }
+
+    return resp.build();
+  }
+
   private ServiceListResponse getServiceList(ServiceListRequest request)
       throws IOException {
     ServiceListResponse.Builder resp = ServiceListResponse.newBuilder();
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
index e6debcdc23..451417ba3d 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
@@ -24,11 +24,13 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
@@ -44,11 +46,13 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.File;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
@@ -558,36 +562,120 @@ public class TestOmMetadataManager {
 
   }
 
-  private static BucketLayout getDefaultBucketLayout() {
-    return BucketLayout.DEFAULT;
-  }
+  /**
+   * Tests inner impl of listOpenFiles with different bucket types with and
+   * without pagination. NOTE: This UT does NOT test hsync here since the hsync
+   * status check is done purely on the client side.
+   * @param bucketLayout BucketLayout
+   */
+  @ParameterizedTest
+  @EnumSource
+  public void testListOpenFiles(BucketLayout bucketLayout) throws Exception {
+    final long clientID = 1000L;
 
-  @Test
-  public void testGetExpiredOpenKeys() throws Exception {
-    testGetExpiredOpenKeys(BucketLayout.DEFAULT);
-  }
+    String volumeName = "volume-lof";
+    String bucketName = "bucket-" + bucketLayout.name().toLowerCase();
+    String keyPrefix = "key";
 
-  @Test
-  public void testGetExpiredOpenKeysExcludeMPUs() throws Exception {
-    testGetExpiredOpenKeysExcludeMPUKeys(BucketLayout.DEFAULT);
-  }
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager, bucketLayout);
 
-  @Test
-  public void testGetExpiredOpenKeysFSO() throws Exception {
-    testGetExpiredOpenKeys(BucketLayout.FILE_SYSTEM_OPTIMIZED);
-  }
+    long volumeId = -1L, bucketId = -1L;
+    if (bucketLayout.isFileSystemOptimized()) {
+      volumeId = omMetadataManager.getVolumeId(volumeName);
+      bucketId = omMetadataManager.getBucketId(volumeName, bucketName);
+    }
 
-  @Test
-  public void testGetExpiredOpenKeysExcludeMPUsFSO() throws Exception {
-    testGetExpiredOpenKeysExcludeMPUKeys(BucketLayout.FILE_SYSTEM_OPTIMIZED);
+    int numOpenKeys = 3;
+    List<String> openKeys = new ArrayList<>();
+    for (int i = 0; i < numOpenKeys; i++) {
+      final OmKeyInfo keyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName,
+          bucketName, keyPrefix + i, HddsProtos.ReplicationType.RATIS,
+          HddsProtos.ReplicationFactor.ONE, 0L, Time.now());
+
+      final String dbOpenKeyName;
+      if (bucketLayout.isFileSystemOptimized()) {
+        keyInfo.setParentObjectID(i);
+        keyInfo.setFileName(OzoneFSUtils.getFileName(keyInfo.getKeyName()));
+        OMRequestTestUtils.addFileToKeyTable(true, false,
+            keyInfo.getFileName(), keyInfo, clientID, 0L, omMetadataManager);
+        dbOpenKeyName = omMetadataManager.getOpenFileName(volumeId, bucketId,
+            keyInfo.getParentObjectID(), keyInfo.getFileName(), clientID);
+      } else {
+        OMRequestTestUtils.addKeyToTable(true, false,
+            keyInfo, clientID, 0L, omMetadataManager);
+        dbOpenKeyName = omMetadataManager.getOpenKey(volumeName, bucketName,
+            keyInfo.getKeyName(), clientID);
+      }
+      openKeys.add(dbOpenKeyName);
+    }
+
+    String dbPrefix;
+    if (bucketLayout.isFileSystemOptimized()) {
+      dbPrefix = omMetadataManager.getOzoneKeyFSO(volumeName, bucketName, "");
+    } else {
+      dbPrefix = omMetadataManager.getOzoneKey(volumeName, bucketName, "");
+    }
+
+    // Without pagination
+    ListOpenFilesResult res = omMetadataManager.listOpenFiles(
+        bucketLayout, 100, dbPrefix, false, dbPrefix);
+
+    assertEquals(numOpenKeys, res.getTotalOpenKeyCount());
+    assertEquals(false, res.hasMore());
+    List<OpenKeySession> keySessionList = res.getOpenKeys();
+    assertEquals(numOpenKeys, keySessionList.size());
+    // Verify that every single open key shows up in the result, and in order
+    for (int i = 0; i < numOpenKeys; i++) {
+      OpenKeySession keySession = keySessionList.get(i);
+      assertEquals(keyPrefix + i, keySession.getKeyInfo().getKeyName());
+      assertEquals(clientID, keySession.getId());
+      assertEquals(0, keySession.getOpenVersion());
+    }
+
+    // With pagination
+    int pageSize = 2;
+    int numExpectedKeys = pageSize;
+    res = omMetadataManager.listOpenFiles(
+        bucketLayout, pageSize, dbPrefix, false, dbPrefix);
+    // total open key count should still be 3
+    assertEquals(numOpenKeys, res.getTotalOpenKeyCount());
+    // hasMore should have been set
+    assertEquals(true, res.hasMore());
+    keySessionList = res.getOpenKeys();
+    assertEquals(numExpectedKeys, keySessionList.size());
+    for (int i = 0; i < numExpectedKeys; i++) {
+      OpenKeySession keySession = keySessionList.get(i);
+      assertEquals(keyPrefix + i, keySession.getKeyInfo().getKeyName());
+      assertEquals(clientID, keySession.getId());
+      assertEquals(0, keySession.getOpenVersion());
+    }
+
+    // Get the second page
+    res = omMetadataManager.listOpenFiles(
+        bucketLayout, pageSize, dbPrefix, true, res.getContinuationToken());
+    numExpectedKeys = numOpenKeys - pageSize;
+    // total open key count should still be 3
+    assertEquals(numOpenKeys, res.getTotalOpenKeyCount());
+    assertEquals(false, res.hasMore());
+    keySessionList = res.getOpenKeys();
+    assertEquals(numExpectedKeys, keySessionList.size());
+    for (int i = 0; i < numExpectedKeys; i++) {
+      OpenKeySession keySession = keySessionList.get(i);
+      assertEquals(keyPrefix + (pageSize + i),
+          keySession.getKeyInfo().getKeyName());
+      assertEquals(clientID, keySession.getId());
+      assertEquals(0, keySession.getOpenVersion());
+    }
   }
 
-  @Test
-  public void testGetExpiredMultipartUploads() throws Exception {
-    testGetExpiredMPUs();
+  private static BucketLayout getDefaultBucketLayout() {
+    return BucketLayout.DEFAULT;
   }
 
-  private void testGetExpiredOpenKeys(BucketLayout bucketLayout)
+  @ParameterizedTest
+  @EnumSource
+  public void testGetExpiredOpenKeys(BucketLayout bucketLayout)
       throws Exception {
     final String bucketName = UUID.randomUUID().toString();
     final String volumeName = UUID.randomUUID().toString();
@@ -665,7 +753,9 @@ public class TestOmMetadataManager {
     assertThat(expiredKeys).containsAll(names);
   }
 
-  private void testGetExpiredOpenKeysExcludeMPUKeys(
+  @ParameterizedTest
+  @EnumSource
+  public void testGetExpiredOpenKeysExcludeMPUKeys(
       BucketLayout bucketLayout) throws Exception {
     final String bucketName = UUID.randomUUID().toString();
     final String volumeName = UUID.randomUUID().toString();
@@ -753,7 +843,8 @@ public class TestOmMetadataManager {
         .isEmpty());
   }
 
-  private void testGetExpiredMPUs() throws Exception {
+  @Test
+  public void testGetExpiredMPUs() throws Exception {
     final String bucketName = UUID.randomUUID().toString();
     final String volumeName = UUID.randomUUID().toString();
     final int numExpiredMPUs = 4;
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/ListOpenFilesSubCommand.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/ListOpenFilesSubCommand.java
new file mode 100644
index 0000000000..9ede45a80a
--- /dev/null
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/ListOpenFilesSubCommand.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.admin.om;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.server.JsonUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import picocli.CommandLine;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/**
+ * Handler of ozone admin om list-open-files command.
+ */
[email protected](
+    name = "list-open-files",
+    aliases = {"list-open-keys", "lof", "lok"},
+    description = "Lists open files (keys) in Ozone Manager.",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class
+)
+public class ListOpenFilesSubCommand implements Callable<Void> {
+
+  @CommandLine.ParentCommand
+  private OMAdmin parent;
+
+  @CommandLine.Option(
+      names = {"--service-id", "--om-service-id"},
+      description = "Ozone Manager Service ID",
+      required = false
+  )
+  private String omServiceId;
+
+  @CommandLine.Option(
+      names = {"--service-host"},
+      description = "Ozone Manager Host. If OM HA is enabled, use --service-id 
instead. "
+          + "If you must use --service-host with OM HA, this must point 
directly to the leader OM. "
+          + "This option is required when --service-id is not provided or when 
HA is not enabled."
+  )
+  private String omHost;
+
+  @CommandLine.Option(names = { "--json" },
+      defaultValue = "false",
+      description = "Format output as JSON")
+  private boolean json;
+
+  // Conforms to ListOptions, but not all in ListOptions applies here thus
+  // not using that directly
+  @CommandLine.Option(
+      names = {"-p", "--prefix"},
+      description = "Filter results by the specified path on the server side.",
+      defaultValue = "/"
+  )
+  private String pathPrefix;
+
+  @CommandLine.Option(
+      names = {"-l", "--length"},
+      description = "Maximum number of items to list",
+      defaultValue = "100"
+  )
+  private int limit;
+
+  @CommandLine.Option(
+      names = {"-s", "--start"},
+      description = "The item to start the listing from.\n" +
+          "i.e. continuation token. " +
+          "This will be excluded from the result.",
+      defaultValue = ""
+  )
+  private String startItem;
+
+  @Override
+  public Void call() throws Exception {
+
+    if (StringUtils.isEmpty(omServiceId) && StringUtils.isEmpty(omHost)) {
+      System.err.println("Error: Please specify -id or -host");
+      return null;
+    }
+
+    OzoneManagerProtocol ozoneManagerClient =
+        parent.createOmClient(omServiceId, omHost, false);
+
+    ListOpenFilesResult res =
+        ozoneManagerClient.listOpenFiles(pathPrefix, limit, startItem);
+
+    if (json) {
+      // Print detailed JSON
+      printOpenKeysListAsJson(res);
+    } else {
+      // Human friendly output
+      printOpenKeysList(res);
+    }
+
+    return null;
+  }
+
+  private void printOpenKeysListAsJson(ListOpenFilesResult res)
+      throws IOException {
+    System.out.println(JsonUtils.toJsonStringWithDefaultPrettyPrinter(res));
+  }
+
+  private void printOpenKeysList(ListOpenFilesResult res) {
+
+    List<OpenKeySession> openFileList = res.getOpenKeys();
+
+    String msg = res.getTotalOpenKeyCount() +
+        " total open files (est.). Showing " + openFileList.size() +
+        " open files (limit " + limit + ") under path prefix:\n  " + 
pathPrefix;
+
+    if (startItem != null && !startItem.isEmpty()) {
+      msg += "\nafter continuation token:\n  " + startItem;
+    }
+    msg += "\n\nClient ID\t\tCreation time\tHsync'ed\tOpen File Path";
+    System.out.println(msg);
+
+    for (OpenKeySession e : openFileList) {
+      long clientId = e.getId();
+      OmKeyInfo omKeyInfo = e.getKeyInfo();
+      String line = clientId + "\t" + omKeyInfo.getCreationTime() + "\t";
+
+      if (omKeyInfo.isHsync()) {
+        String hsyncClientIdStr =
+            omKeyInfo.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID);
+        long hsyncClientId = Long.parseLong(hsyncClientIdStr);
+        if (clientId == hsyncClientId) {
+          line += "Yes\t\t";
+        } else {
+          // last hsync'ed with a different client ID than the client that
+          // initially opens the file (!)
+          line += "Yes w/ cid " + hsyncClientIdStr + "\t";
+        }
+      } else {
+        line += "No\t\t";
+      }
+
+      line += getFullPathFromKeyInfo(omKeyInfo);
+
+      System.out.println(line);
+    }
+
+    // Compose next batch's command
+    if (res.hasMore()) {
+      String nextBatchCmd = getCmdForNextBatch(res.getContinuationToken());
+
+      System.out.println("\n" +
+          "To get the next batch of open keys, run:\n  " + nextBatchCmd);
+    } else {
+      System.out.println("\nReached the end of the list.");
+    }
+  }
+
+  /**
+   * @return the command to get the next batch of open keys
+   */
+  private String getCmdForNextBatch(String lastElementFullPath) {
+    String nextBatchCmd = "ozone admin om lof";
+    if (omServiceId != null && !omServiceId.isEmpty()) {
+      nextBatchCmd += " -id=" + omServiceId;
+    }
+    if (omHost != null && !omHost.isEmpty()) {
+      nextBatchCmd += " -host=" + omHost;
+    }
+    if (json) {
+      nextBatchCmd += " --json";
+    }
+    nextBatchCmd += " --length=" + limit;
+    if (pathPrefix != null && !pathPrefix.isEmpty()) {
+      nextBatchCmd += " --prefix=" + pathPrefix;
+    }
+    nextBatchCmd += " --start=" + lastElementFullPath;
+    return nextBatchCmd;
+  }
+
+  private String getFullPathFromKeyInfo(OmKeyInfo oki) {
+    return "/" + oki.getVolumeName() +
+        "/" + oki.getBucketName() +
+        "/" + oki.getPath();
+  }
+
+}
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java
index ce7d4ed7a7..3162c55635 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java
@@ -53,6 +53,7 @@ import java.util.Collection;
     versionProvider = HddsVersionProvider.class,
     subcommands = {
         FinalizeUpgradeSubCommand.class,
+        ListOpenFilesSubCommand.class,
         GetServiceRolesSubcommand.class,
         PrepareSubCommand.class,
         CancelPrepareSubCommand.class,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to