This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-7593 by this push:
new 082d759448 HDDS-8830. Add admin CLI to list open files (#5920)
082d759448 is described below
commit 082d759448e3995b9330f8e914bb5bab171acaf2
Author: Siyao Meng <[email protected]>
AuthorDate: Thu Jan 18 19:24:12 2024 -0800
HDDS-8830. Add admin CLI to list open files (#5920)
---
hadoop-hdds/docs/content/tools/Admin.md | 141 +++++++++++++-
.../hadoop/ozone/client/MockOmTransport.java | 11 ++
.../main/java/org/apache/hadoop/ozone/OmUtils.java | 1 +
.../ozone/om/helpers/ListOpenFilesResult.java | 115 ++++++++++++
.../hadoop/ozone/om/helpers/OpenKeySession.java | 3 +
.../ozone/om/protocol/OzoneManagerProtocol.java | 12 ++
...OzoneManagerProtocolClientSideTranslatorPB.java | 29 +++
.../dist/src/main/compose/ozone/docker-config | 2 +
.../org/apache/hadoop/ozone/om/TestOmMetrics.java | 12 +-
.../hadoop/ozone/shell/TestOzoneShellHA.java | 139 ++++++++++++++
.../ozone/shell/TestOzoneShellHAWithFSO.java | 2 +
.../src/main/proto/OmClientProtocol.proto | 24 +++
.../apache/hadoop/ozone/om/OMMetadataManager.java | 48 ++++-
.../org/apache/hadoop/ozone/audit/OMAction.java | 4 +-
.../java/org/apache/hadoop/ozone/om/OMMetrics.java | 12 +-
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 116 +++++++++++-
.../org/apache/hadoop/ozone/om/OzoneManager.java | 113 ++++++++++--
.../protocolPB/OzoneManagerRequestHandler.java | 34 ++++
.../hadoop/ozone/om/TestOmMetadataManager.java | 139 +++++++++++---
.../ozone/admin/om/ListOpenFilesSubCommand.java | 202 +++++++++++++++++++++
.../org/apache/hadoop/ozone/admin/om/OMAdmin.java | 1 +
21 files changed, 1112 insertions(+), 48 deletions(-)
diff --git a/hadoop-hdds/docs/content/tools/Admin.md
b/hadoop-hdds/docs/content/tools/Admin.md
index a05065e8eb..e89331230f 100644
--- a/hadoop-hdds/docs/content/tools/Admin.md
+++ b/hadoop-hdds/docs/content/tools/Admin.md
@@ -32,4 +32,143 @@ And quick overview about the available functionalities:
* `ozone admin replicationmanager`: Can be used to check the status of the
replications (and start / stop replication in case of emergency).
* `ozone admin om`: Ozone Manager HA related tool to get information about
the current cluster.
-For more detailed usage see the output of the `--help`.
+For more detailed usage see the output of `--help`.
+
+```bash
+$ ozone admin --help
+Usage: ozone admin [-hV] [--verbose] [-conf=<configurationPath>]
+ [-D=<String=String>]... [COMMAND]
+Developer tools for Ozone Admin operations
+ -conf=<configurationPath>
+
+ -D, --set=<String=String>
+
+ -h, --help Show this help message and exit.
+ -V, --version Print version information and exit.
+ --verbose More verbose output. Show the stack trace of the errors.
+Commands:
+ containerbalancer ContainerBalancer specific operations
+ replicationmanager ReplicationManager specific operations
+ safemode Safe mode specific operations
+ printTopology Print a tree of the network topology as reported by SCM
+ cert Certificate related operations
+ container Container specific operations
+ datanode Datanode specific operations
+ pipeline Pipeline specific operations
+ namespace Namespace Summary specific admin operations
+ om Ozone Manager specific admin operations
+ reconfig Dynamically reconfigure server without restarting it
+ scm Ozone Storage Container Manager specific admin operations
+```
+
+Some of those subcommand usages has been detailed in their dedicated feature
documentation pages. For instance, [Decommissioning]({{<ref
"feature/Decommission.md">}}), [Non-Rolling Upgrades and Downgrades]({{<ref
"feature/Nonrolling-Upgrade.md">}}).
+
+
+## List open files
+
+List open files admin command lists open keys in Ozone Manager's
`OpenKeyTable`.
+Works for all bucket types.
+Argument `--prefix` could be root (`/`), path to a bucket (`/vol1/buck`) or a
key prefix (for FSO buckets the key prefix could contain parent object ID). But
it can't be a volume.
+
+```bash
+$ ozone admin om lof --help
+Usage: ozone admin om list-open-files [-hV] [--json] [-l=<limit>]
+ [-p=<pathPrefix>] [-s=<startItem>]
+ [--service-host=<omHost>]
+ [--service-id=<omServiceId>]
+Lists open files (keys) in Ozone Manager.
+ -h, --help Show this help message and exit.
+ --json Format output as JSON
+ -l, --length=<limit> Maximum number of items to list
+ -p, --prefix=<pathPrefix> Filter results by the specified path on the server
+ side.
+ -s, --start=<startItem> The item to start the listing from.
+ i.e. continuation token. This will be excluded from
+ the result.
+ --service-host=<omHost>
+ Ozone Manager Host. If OM HA is enabled, use
+ --service-id instead. If you must use
+ --service-host with OM HA, this must point
+ directly to the leader OM. This option is
+ required when --service-id is not provided or
+ when HA is not enabled.
+ --service-id, --om-service-id=<omServiceId>
+ Ozone Manager Service ID
+ -V, --version Print version information and exit.
+```
+
+### Example usages
+
+- In human-readable format, list open files (keys) under bucket
`/volumelof/buck1` with a batch size of 3:
+
+```bash
+$ ozone admin om lof --service-id=om-service-test1 --length=3
--prefix=/volumelof/buck1
+```
+
+```bash
+5 total open files (est.). Showing 3 open files (limit 3) under path prefix:
+ /volume-lof/buck1
+
+Client ID Creation time Hsync'ed Open File Path
+111726338148007937 1704808626523 No
/volume-lof/buck1/-9223372036854774527/key0
+111726338151415810 1704808626578 No
/volume-lof/buck1/-9223372036854774527/key1
+111726338152071171 1704808626588 No
/volume-lof/buck1/-9223372036854774527/key2
+
+To get the next batch of open keys, run:
+ ozone admin om lof -id=om-service-test1 --length=3
--prefix=/volume-lof/buck1
--start=/-9223372036854775552/-9223372036854775040/-9223372036854774527/key2/111726338152071171
+```
+
+- In JSON, list open files (keys) under bucket `/volumelof/buck1` with a batch
size of 3:
+
+```bash
+$ ozone admin om lof --service-id=om-service-test1 --length=3
--prefix=/volumelof/buck1 --json
+```
+
+```json
+{
+ "openKeys" : [ {
+ "keyInfo" : {
+ "metadata" : { },
+ "objectID" : -9223372036854774015,
+ "updateID" : 7,
+ "parentObjectID" : -9223372036854774527,
+ "volumeName" : "volume-lof",
+ "bucketName" : "buck1",
+ "keyName" : "key0",
+ "dataSize" : 4194304,
+ "keyLocationVersions" : [ ... ],
+ "creationTime" : 1704808722487,
+ "modificationTime" : 1704808722487,
+ "replicationConfig" : {
+ "replicationFactor" : "THREE",
+ "requiredNodes" : 3,
+ "replicationType" : "RATIS"
+ },
+ "fileName" : "key0",
+ "acls" : [ ... ],
+ "path" : "-9223372036854774527/key0",
+ "file" : true,
+ "replicatedSize" : 12582912,
+ "objectInfo" : "OMKeyInfo{volume='volume-lof', bucket='buck1',
key='key0', dataSize='4194304', creationTime='1704808722487',
objectID='-9223372036854774015', parentID='-9223372036854774527',
replication='RATIS/THREE', fileChecksum='null}",
+ "hsync" : false,
+ "latestVersionLocations" : { ... },
+ "updateIDset" : true
+ },
+ "openVersion" : 0,
+ "clientId" : 111726344437039105
+ }, {
+ "keyInfo" : { ... },
+ "openVersion" : 0,
+ "clientId" : 111726344440578050
+ }, {
+ "keyInfo" : { ... },
+ "openVersion" : 0,
+ "clientId" : 111726344441233411
+ } ],
+ "totalOpenKeyCount" : 5,
+ "hasMore" : true,
+ "contToken" :
"/-9223372036854775552/-9223372036854775040/-9223372036854774527/key2/111726344441233411"
+}
+```
+
+Note in JSON output mode, field `contToken` won't show up at all in the result
if there are no more entries after the batch (i.e. when `hasMore` is `false`).
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
index e91a71a856..e4a8a80a63 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
@@ -45,6 +45,8 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoVol
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocationList;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListOpenFilesRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListOpenFilesResponse;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LookupKeyRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LookupKeyResponse;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
@@ -125,6 +127,10 @@ public class MockOmTransport implements OmTransport {
return response(payload,
r -> r.setServiceListResponse(
serviceList(payload.getServiceListRequest())));
+ case ListOpenFiles:
+ return response(payload,
+ r -> r.setListOpenFilesResponse(
+ listOpenFiles(payload.getListOpenFilesRequest())));
case AllocateBlock:
return response(payload, r -> r.setAllocateBlockResponse(
allocateBlock(payload.getAllocateBlockRequest())));
@@ -324,6 +330,11 @@ public class MockOmTransport implements OmTransport {
.build();
}
+ private ListOpenFilesResponse listOpenFiles(
+ ListOpenFilesRequest listOpenFilesRequest) {
+ return ListOpenFilesResponse.newBuilder().build();
+ }
+
private OMResponse response(OMRequest payload,
Function<OMResponse.Builder, OMResponse.Builder> function) {
Builder builder = OMResponse.newBuilder();
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index f23a703bd0..c6e410bb45 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -244,6 +244,7 @@ public final class OmUtils {
case ListKeysLight:
case ListTrash:
case ServiceList:
+ case ListOpenFiles:
case ListMultiPartUploadParts:
case GetFileStatus:
case LookupFile:
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ListOpenFilesResult.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ListOpenFilesResult.java
new file mode 100644
index 0000000000..67ef5dfec2
--- /dev/null
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ListOpenFilesResult.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.helpers;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Encapsulates the result of listOpenFiles. It has a list of
+ * {@link OpenKeySession} and a boolean flag indicating if there
+ * are more entries that are not fetched after the current batch of result.
+ */
+public class ListOpenFilesResult {
+ /**
+ * Number of total open files globally.
+ */
+ @JsonProperty("totalOpenKeyCount")
+ private final long totalOpenKeyCount;
+ /**
+ * True if there are more entries after this batch under the given path.
+ */
+ @JsonProperty("hasMore")
+ private final boolean hasMore;
+ /**
+ * True if there are more entries after this batch under the given path.
+ */
+ @JsonProperty("contToken")
+ private final String continuationToken;
+ /**
+ * List of open files. Each has client ID and OmKeyInfo.
+ */
+ private final List<OpenKeySession> openKeySessionList;
+
+ public ListOpenFilesResult(long totalOpenKeyCount,
+ boolean hasMore,
+ String continuationToken,
+ List<OpenKeySession> openKeySessionList) {
+ this.openKeySessionList = openKeySessionList;
+ this.hasMore = hasMore;
+ this.continuationToken = continuationToken;
+ this.totalOpenKeyCount = totalOpenKeyCount;
+ }
+
+ public ListOpenFilesResult(long totalOpenKeyCount,
+ boolean hasMore,
+ String continuationToken,
+ List<Long> clientIDsList,
+ List<KeyInfo> keyInfosList)
+ throws IOException {
+ this.openKeySessionList = getOpenKeySessionListFromPB(clientIDsList,
+ keyInfosList);
+ this.hasMore = hasMore;
+ this.continuationToken = continuationToken;
+ this.totalOpenKeyCount = totalOpenKeyCount;
+ }
+
+ /**
+ * Combines clientIDsList and keyInfosList into OpenKeySessionList for
+ * transfer to the client.
+ */
+ private List<OpenKeySession> getOpenKeySessionListFromPB(
+ List<Long> clientIDsList, List<KeyInfo> keyInfosList)
+ throws IOException {
+
+ Preconditions.checkArgument(clientIDsList.size() == keyInfosList.size(),
+ "clientIDsList size (" + clientIDsList.size() + ") should be " +
+ "the same as keyInfosList's (" + keyInfosList.size() + ")");
+
+ List<OpenKeySession> res = new ArrayList<>();
+
+ for (int i = 0; i < clientIDsList.size(); i++) {
+ OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfosList.get(i));
+ res.add(new OpenKeySession(clientIDsList.get(i),
+ omKeyInfo,
+ omKeyInfo.getLatestVersionLocations().getVersion()));
+ }
+ return res;
+ }
+
+ public long getTotalOpenKeyCount() {
+ return totalOpenKeyCount;
+ }
+
+ public boolean hasMore() {
+ return hasMore;
+ }
+
+ public String getContinuationToken() {
+ return continuationToken;
+ }
+
+ public List<OpenKeySession> getOpenKeys() {
+ return openKeySessionList;
+ }
+}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OpenKeySession.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OpenKeySession.java
index 11ee622494..d3b32815d8 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OpenKeySession.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OpenKeySession.java
@@ -17,12 +17,15 @@
*/
package org.apache.hadoop.ozone.om.helpers;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
/**
* This class represents a open key "session". A session here means a key is
* opened by a specific client, the client sends the handler to server, such
* that servers can recognize this client, and thus know how to close the key.
*/
public class OpenKeySession {
+ @JsonProperty("clientId")
private final long id;
private final OmKeyInfo keyInfo;
// the version of the key when it is being opened in this session.
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 824346058d..f60fe686d0 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.DBUpdates;
import org.apache.hadoop.ozone.om.helpers.DeleteTenantState;
import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
+import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDeleteKeys;
@@ -422,6 +423,17 @@ public interface OzoneManagerProtocol
ServiceInfoEx getServiceInfo() throws IOException;
+ /**
+ * List open files in OM.
+ * @param path One of: root "/", path to a bucket, key path, or key prefix
+ * @param maxKeys Limit the number of keys that can be returned in this
batch.
+ * @param contToken Continuation token.
+ * @return ListOpenFilesResult
+ * @throws IOException
+ */
+ ListOpenFilesResult listOpenFiles(String path, int maxKeys, String contToken)
+ throws IOException;
+
/**
* Transfer the raft leadership.
*
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 84c33db925..57a9d23ee9 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.om.helpers.DBUpdates;
import org.apache.hadoop.ozone.om.helpers.DeleteTenantState;
import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
+import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDeleteKeys;
@@ -134,6 +135,8 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListKey
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListKeysLightResponse;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListMultipartUploadsRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListMultipartUploadsResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListOpenFilesRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListOpenFilesResponse;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusResponse;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusLightResponse;
@@ -1795,6 +1798,32 @@ public final class
OzoneManagerProtocolClientSideTranslatorPB
resp.getCaCertificate(), resp.getCaCertsList());
}
+ @Override
+ public ListOpenFilesResult listOpenFiles(String path,
+ int maxKeys,
+ String contToken)
+ throws IOException {
+ ListOpenFilesRequest req = ListOpenFilesRequest.newBuilder()
+ .setPath(path)
+ .setCount(maxKeys)
+ .setToken(contToken)
+ .build();
+
+ OMRequest omRequest = createOMRequest(Type.ListOpenFiles)
+ .setListOpenFilesRequest(req)
+ .build();
+
+ final ListOpenFilesResponse resp = handleError(submitRequest(omRequest))
+ .getListOpenFilesResponse();
+
+ return new ListOpenFilesResult(
+ resp.getTotalOpenKeyCount(),
+ resp.getHasMore(),
+ resp.hasContinuationToken() ? resp.getContinuationToken() : null,
+ resp.getClientIDList(),
+ resp.getKeyInfoList());
+ }
+
@Override
public void transferLeadership(String newLeaderId)
throws IOException {
diff --git a/hadoop-ozone/dist/src/main/compose/ozone/docker-config
b/hadoop-ozone/dist/src/main/compose/ozone/docker-config
index 729f036e15..27208cf654 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone/docker-config
@@ -54,6 +54,8 @@ OZONE-SITE.XML_hdds.scm.wait.time.after.safemode.exit=30s
OZONE-SITE.XML_dfs.container.ratis.datastream.enabled=true
+OZONE-SITE.XML_ozone.fs.hsync.enabled=true
+
OZONE_CONF_DIR=/etc/hadoop
OZONE_LOG_DIR=/var/log/hadoop
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
index 1a87b45079..0f8c82a558 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
@@ -305,7 +305,7 @@ public class TestOmMetrics {
doKeyOps(keyArgs);
MetricsRecordBuilder omMetrics = getMetrics("OMMetrics");
- assertCounter("NumKeyOps", 7L, omMetrics);
+ assertCounter("NumKeyOps", 8L, omMetrics);
assertCounter("NumKeyAllocate", 1L, omMetrics);
assertCounter("NumKeyLookup", 1L, omMetrics);
assertCounter("NumKeyDeletes", 1L, omMetrics);
@@ -313,12 +313,13 @@ public class TestOmMetrics {
assertCounter("NumTrashKeyLists", 1L, omMetrics);
assertCounter("NumKeys", 0L, omMetrics);
assertCounter("NumInitiateMultipartUploads", 1L, omMetrics);
+ assertCounter("NumListOpenFiles", 1L, omMetrics);
keyArgs = createKeyArgs(volumeName, bucketName,
new ECReplicationConfig("rs-3-2-1024K"));
doKeyOps(keyArgs);
omMetrics = getMetrics("OMMetrics");
- assertCounter("NumKeyOps", 14L, omMetrics);
+ assertCounter("NumKeyOps", 16L, omMetrics);
assertCounter("EcKeyCreateTotal", 1L, omMetrics);
keyArgs = createKeyArgs(volumeName, bucketName,
@@ -370,7 +371,7 @@ public class TestOmMetrics {
doKeyOps(keyArgs);
omMetrics = getMetrics("OMMetrics");
- assertCounter("NumKeyOps", 28L, omMetrics);
+ assertCounter("NumKeyOps", 31L, omMetrics);
assertCounter("NumKeyAllocate", 6L, omMetrics);
assertCounter("NumKeyLookup", 3L, omMetrics);
assertCounter("NumKeyDeletes", 4L, omMetrics);
@@ -813,6 +814,11 @@ public class TestOmMetrics {
writeClient.initiateMultipartUpload(keyArgs);
} catch (IOException ignored) {
}
+
+ try {
+ writeClient.listOpenFiles("", 100, "");
+ } catch (IOException ignored) {
+ }
}
private OmKeyArgs createKeyArgs(String volumeName, String bucketName,
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java
index 92381829f0..628a101ea9 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHA.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.UUID;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
@@ -71,6 +72,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.fs.FileSystem.TRASH_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
@@ -80,6 +83,7 @@ import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLU
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -89,7 +93,10 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
+import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,6 +113,7 @@ import picocli.CommandLine.RunLast;
* Inspired by TestS3Shell
*/
@Timeout(300)
+@TestMethodOrder(OrderAnnotation.class)
public class TestOzoneShellHA {
private static final Logger LOG =
@@ -140,6 +148,7 @@ public class TestOzoneShellHA {
@BeforeAll
public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setBoolean(OZONE_FS_HSYNC_ENABLED, true);
startCluster(conf);
}
@@ -549,6 +558,134 @@ public class TestOzoneShellHA {
execute(ozoneAdminShell, args);
}
+ @Test
+ public void testAdminCmdListOpenFiles()
+ throws IOException, InterruptedException {
+
+ OzoneConfiguration conf = cluster.getConf();
+ final String hostPrefix = OZONE_OFS_URI_SCHEME + "://" + omServiceId;
+
+ OzoneConfiguration clientConf = getClientConfForOFS(hostPrefix, conf);
+ clientConf.setBoolean(OZONE_FS_HSYNC_ENABLED, true);
+ FileSystem fs = FileSystem.get(clientConf);
+
+ assertNotEquals(fs.getConf().get(OZONE_FS_HSYNC_ENABLED),
+ "false", OZONE_FS_HSYNC_ENABLED + " is set to false " +
+ "by external force. Must be true to allow hsync to function");
+
+ final String volumeName = "volume-lof";
+ final String bucketName = "buck1";
+
+ String dir1 = hostPrefix +
+ OM_KEY_PREFIX + volumeName +
+ OM_KEY_PREFIX + bucketName +
+ OM_KEY_PREFIX + "dir1";
+ // Create volume, bucket, dir
+ assertTrue(fs.mkdirs(new Path(dir1)));
+ String keyPrefix = OM_KEY_PREFIX + "key";
+
+ final int numKeys = 5;
+ String[] keys = new String[numKeys];
+
+ for (int i = 0; i < numKeys; i++) {
+ keys[i] = dir1 + keyPrefix + i;
+ }
+
+ int pageSize = 3;
+ String pathToBucket = "/" + volumeName + "/" + bucketName;
+ FSDataOutputStream[] streams = new FSDataOutputStream[numKeys];
+
+ try {
+ // Create multiple keys and hold them open
+ for (int i = 0; i < numKeys; i++) {
+ streams[i] = fs.create(new Path(keys[i]));
+ streams[i].write(1);
+ }
+
+ // Wait for DB flush
+ cluster.getOzoneManager().awaitDoubleBufferFlush();
+
+ String[] args = new String[] {"om", "lof",
+ "--service-id", omServiceId,
+ "-l", String.valueOf(numKeys + 1), // pagination
+ "-p", pathToBucket};
+ // Run listopenfiles
+ execute(ozoneAdminShell, args);
+ String cmdRes = getStdOut();
+ // Should have retrieved all 5 open keys
+ for (int i = 0; i < numKeys; i++) {
+ assertTrue(cmdRes.contains(keyPrefix + i));
+ }
+
+ // Try pagination
+ args = new String[] {"om", "lof",
+ "--service-id", omServiceId,
+ "-l", String.valueOf(pageSize), // pagination
+ "-p", pathToBucket};
+ execute(ozoneAdminShell, args);
+ cmdRes = getStdOut();
+
+ // Should have retrieved the 1st page only (3 keys)
+ for (int i = 0; i < pageSize; i++) {
+ assertTrue(cmdRes.contains(keyPrefix + i));
+ }
+ for (int i = pageSize; i < numKeys; i++) {
+ assertFalse(cmdRes.contains(keyPrefix + i));
+ }
+ // No hsync'ed file/key at this point
+ assertFalse(cmdRes.contains("\tYes\t"));
+
+ // Get last line of the output which has the continuation token
+ String[] lines = cmdRes.split("\n");
+ String nextCmd = lines[lines.length - 1].trim();
+ String kw = "--start=";
+ String contToken =
+ nextCmd.substring(nextCmd.lastIndexOf(kw) + kw.length());
+
+ args = new String[] {"om", "lof",
+ "--service-id", omServiceId,
+ "-l", String.valueOf(pageSize), // pagination
+ "-p", pathToBucket,
+ "-s", contToken};
+ execute(ozoneAdminShell, args);
+ cmdRes = getStdOut();
+
+ // Should have retrieved the 2nd page only (2 keys)
+ for (int i = 0; i < pageSize - 1; i++) {
+ assertFalse(cmdRes.contains(keyPrefix + i));
+ }
+ // Note: key2 is shown in the continuation token prompt
+ for (int i = pageSize - 1; i < numKeys; i++) {
+ assertTrue(cmdRes.contains(keyPrefix + i));
+ }
+
+ // hsync last key
+ streams[numKeys - 1].hsync();
+ // Wait for flush
+ cluster.getOzoneManager().awaitDoubleBufferFlush();
+
+ execute(ozoneAdminShell, args);
+ cmdRes = getStdOut();
+
+ // Verify that only one key is hsync'ed
+ assertTrue(cmdRes.contains("\tYes\t"), "One key should be hsync'ed");
+ assertTrue(cmdRes.contains("\tNo\t"), "One key should not be hsync'ed");
+ } finally {
+ // Cleanup
+ IOUtils.closeQuietly(streams);
+ }
+
+ }
+
+ /**
+ * Return stdout as a String, then clears existing output.
+ */
+ private String getStdOut() throws UnsupportedEncodingException {
+ String res = out.toString(UTF_8.name());
+ out.reset();
+ return res;
+ }
+
/**
* Helper function to retrieve Ozone client configuration for trash testing.
* @param hostPrefix Scheme + Authority. e.g. ofs://om-service-test1
@@ -1451,6 +1588,8 @@ public class TestOzoneShellHA {
}
@Test
+ // Run this UT last. This interferes with testAdminCmdListOpenFiles
+ @Order(Integer.MAX_VALUE)
public void testRecursiveBucketDelete()
throws Exception {
String volume1 = "volume50";
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHAWithFSO.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHAWithFSO.java
index 0c6a5e8143..485cb2c919 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHAWithFSO.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneShellHAWithFSO.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.shell;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -37,6 +38,7 @@ public class TestOzoneShellHAWithFSO extends TestOzoneShellHA
{
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT,
OMConfigKeys.OZONE_BUCKET_LAYOUT_FILE_SYSTEM_OPTIMIZED);
+ conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
startCluster(conf);
}
diff --git
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index cf5630ecee..1dbe0853a1 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -146,6 +146,8 @@ enum Type {
SetSnapshotProperty = 128;
ListStatusLight = 129;
GetSnapshotInfo = 130;
+
+ ListOpenFiles = 132;
}
enum SafeMode {
@@ -281,6 +283,8 @@ message OMRequest {
optional MultipartUploadsExpiredAbortRequest
multipartUploadsExpiredAbortRequest = 126;
optional SetSnapshotPropertyRequest SetSnapshotPropertyRequest =
127;
optional SnapshotInfoRequest SnapshotInfoRequest =
128;
+
+ optional ListOpenFilesRequest ListOpenFilesRequest =
132;
}
message OMResponse {
@@ -403,6 +407,8 @@ message OMResponse {
optional ListStatusLightResponse listStatusLightResponse =
129;
optional SnapshotInfoResponse SnapshotInfoResponse =
130;
optional OMLockDetailsProto omLockDetails =
131;
+
+ optional ListOpenFilesResponse ListOpenFilesResponse =
132;
}
enum Status {
@@ -1529,6 +1535,24 @@ message CancelPrepareResponse {
}
+message ListOpenFilesRequest {
+ optional string path = 1;
+ optional uint32 count = 2;
+ optional string token = 3;
+}
+
+message ListOpenFilesResponse {
+ // size of openKeyTable and openFileTable combined
+ optional uint64 totalOpenKeyCount = 1;
+ // indicates if there are more entries to be retrieved under the given path
+ optional bool hasMore = 2;
+ // continuation token should match a dbKey in openKeyTable or openFileTable
+ optional string continuationToken = 3;
+ // result
+ repeated uint64 clientID = 4;
+ repeated KeyInfo keyInfo = 5;
+}
+
message ServicePort {
enum Type {
RPC = 1;
diff --git
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index 68c5cf758e..00bf575205 100644
---
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.om.helpers.ListKeysResult;
+import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
import org.apache.hadoop.ozone.om.helpers.OmDBUserPrincipalInfo;
@@ -55,6 +56,8 @@ import org.apache.hadoop.hdds.utils.db.Table;
import com.google.common.annotations.VisibleForTesting;
import org.apache.ozone.compaction.log.CompactionLogEntry;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
/**
* OM metadata manager interface.
*/
@@ -122,9 +125,17 @@ public interface OMMetadataManager extends
DBStoreHAManager {
* @param key - key name
* @return DB key as String.
*/
-
String getOzoneKey(String volume, String bucket, String key);
+ /**
+ * Get DB key for a key or prefix in an FSO bucket given existing
+ * volume and bucket names.
+ */
+ String getOzoneKeyFSO(String volumeName,
+ String bucketName,
+ String keyPrefix)
+ throws IOException;
+
/**
* Given a volume, bucket and a key, return the corresponding DB directory
* key.
@@ -149,6 +160,17 @@ public interface OMMetadataManager extends
DBStoreHAManager {
*/
String getOpenKey(String volume, String bucket, String key, long id);
+ /**
+ * Returns client ID in Long of an OpenKeyTable DB Key String.
+ * @param dbOpenKeyName An OpenKeyTable DB Key String.
+ * @return Client ID (Long)
+ */
+ static long getClientIDFromOpenKeyDBKey(String dbOpenKeyName) {
+ final int lastPrefix = dbOpenKeyName.lastIndexOf(OM_KEY_PREFIX);
+ final String clientIdString = dbOpenKeyName.substring(lastPrefix + 1);
+ return Long.parseLong(clientIdString);
+ }
+
/**
* Given a volume, check if it is empty, i.e there are no buckets inside it.
*
@@ -188,6 +210,24 @@ public interface OMMetadataManager extends
DBStoreHAManager {
boolean hasSnapshot)
throws IOException;
+ /**
+ * Inner implementation of listOpenFiles. Called after all the arguments are
+ * checked and processed by Ozone Manager.
+ * @param bucketLayout
+ * @param maxKeys
+ * @param dbOpenKeyPrefix
+ * @param hasContToken
+ * @param dbContTokenPrefix
+ * @return ListOpenFilesResult
+ * @throws IOException
+ */
+ ListOpenFilesResult listOpenFiles(BucketLayout bucketLayout,
+ int maxKeys,
+ String dbOpenKeyPrefix,
+ boolean hasContToken,
+ String dbContTokenPrefix)
+ throws IOException;
+
/**
* Returns a list of keys represented by {@link OmKeyInfo} in the given
* bucket.
@@ -278,6 +318,12 @@ public interface OMMetadataManager extends
DBStoreHAManager {
List<OmVolumeArgs> listVolumes(String userName, String prefix,
String startKey, int maxKeys) throws IOException;
+ /**
+ * Get total open key count (estimated, due to the nature of RocksDB impl)
+ * of both OpenKeyTable and OpenFileTable.
+ */
+ long getTotalOpenKeyCount() throws IOException;
+
/**
* Returns the names of up to {@code count} open keys whose age is
* greater than or equal to {@code expireThreshold}.
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
index 4e9039252f..e641d13270 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
@@ -102,7 +102,9 @@ public enum OMAction implements AuditAction {
SNAPSHOT_INFO,
SET_TIMES,
- ABORT_EXPIRED_MULTIPART_UPLOAD;
+ ABORT_EXPIRED_MULTIPART_UPLOAD,
+
+ LIST_OPEN_FILES;
@Override
public String getAction() {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index ed5efbefe5..86fa867060 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -80,6 +80,7 @@ public class OMMetrics implements OmMetadataReaderMetrics {
private @Metric MutableCounterLong numCreateFile;
private @Metric MutableCounterLong numLookupFile;
private @Metric MutableCounterLong numListStatus;
+ private @Metric MutableCounterLong numListOpenFiles;
private @Metric MutableCounterLong numOpenKeyDeleteRequests;
private @Metric MutableCounterLong numOpenKeysSubmittedForDeletion;
@@ -176,6 +177,7 @@ public class OMMetrics implements OmMetadataReaderMetrics {
private @Metric MutableCounterLong numCreateFileFails;
private @Metric MutableCounterLong numLookupFileFails;
private @Metric MutableCounterLong numListStatusFails;
+ private @Metric MutableCounterLong numListOpenFilesFails;
private @Metric MutableCounterLong getNumGetKeyInfoFails;
private @Metric MutableCounterLong numRecoverLeaseFails;
@@ -428,10 +430,18 @@ public class OMMetrics implements OmMetadataReaderMetrics
{
}
public void incNumListS3BucketsFails() {
- numBucketOps.incr();
numBucketS3ListFails.incr();
}
+ public void incNumListOpenFiles() {
+ numKeyOps.incr();
+ numListOpenFiles.incr();
+ }
+
+ public void incNumListOpenFilesFails() {
+ numListOpenFilesFails.incr();
+ }
+
public void incNumInitiateMultipartUploads() {
numKeyOps.incr();
numInitiateMultipartUploads.incr();
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index e272cb8692..1969bce918 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.ozone.om.codec.TokenIdentifierCodec;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.helpers.ListKeysResult;
+import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
import org.apache.hadoop.ozone.om.helpers.OmDBUserPrincipalInfo;
@@ -75,6 +76,7 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload;
import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@@ -102,6 +104,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import static org.apache.hadoop.ozone.OzoneConsts.DB_TRANSIENT_MARKER;
+import static org.apache.hadoop.ozone.OzoneConsts.HSYNC_CLIENT_ID;
import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_LIMIT;
@@ -273,14 +276,14 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
private Table bucketTable;
private Table<String, OmKeyInfo> keyTable;
private Table deletedTable;
- private Table openKeyTable;
+ private Table<String, OmKeyInfo> openKeyTable;
private Table<String, OmMultipartKeyInfo> multipartInfoTable;
private Table<String, S3SecretValue> s3SecretTable;
private Table dTokenTable;
private Table prefixTable;
private Table<String, OmDirectoryInfo> dirTable;
private Table<String, OmKeyInfo> fileTable;
- private Table openFileTable;
+ private Table<String, OmKeyInfo> openFileTable;
private Table transactionInfoTable;
private Table metaTable;
@@ -475,7 +478,7 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
}
@Override
- public Table getOpenKeyTable(BucketLayout bucketLayout) {
+ public Table<String, OmKeyInfo> getOpenKeyTable(BucketLayout bucketLayout) {
if (bucketLayout.isFileSystemOptimized()) {
return openFileTable;
}
@@ -842,6 +845,18 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
return builder.toString();
}
+ @Override
+ public String getOzoneKeyFSO(String volumeName,
+ String bucketName,
+ String keyPrefix)
+ throws IOException {
+ final long volumeId = getVolumeId(volumeName);
+ final long bucketId = getBucketId(volumeName, bucketName);
+ // FSO keyPrefix could look like: -9223372036854774527/key1
+ return getOzoneKey(Long.toString(volumeId),
+ Long.toString(bucketId), keyPrefix);
+ }
+
@Override
public String getOzoneDirKey(String volume, String bucket, String key) {
key = OzoneFSUtils.addTrailingSlashIfNeeded(key);
@@ -1175,6 +1190,94 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
return keyTable.iterator();
}
+ @Override
+ public ListOpenFilesResult listOpenFiles(BucketLayout bucketLayout,
+ int maxKeys,
+ String dbOpenKeyPrefix,
+ boolean hasContToken,
+ String dbContTokenPrefix)
+ throws IOException {
+
+ List<OpenKeySession> openKeySessionList = new ArrayList<>();
+ int currentCount = 0;
+ final boolean hasMore;
+ final String retContToken;
+
+ // TODO: If we want "better" results, we want to iterate cache like
+ // listKeys do. But that complicates the iteration logic by quite a bit.
+ // And if we do that, we need to refactor listKeys as well to dedup.
+
+ final Table<String, OmKeyInfo> okTable, kTable;
+ okTable = getOpenKeyTable(bucketLayout);
+ // keyTable required to check key hsync metadata. TODO: HDDS-10077
+ kTable = getKeyTable(bucketLayout);
+
+ // No lock required since table iterator creates a "snapshot"
+ try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
+ openKeyIter = okTable.iterator()) {
+ KeyValue<String, OmKeyInfo> kv;
+ kv = openKeyIter.seek(dbContTokenPrefix);
+ if (hasContToken && kv.getKey().equals(dbContTokenPrefix)) {
+ // Skip one entry when cont token is specified and the current entry
+ // key is exactly the same as cont token.
+ openKeyIter.next();
+ }
+ while (currentCount < maxKeys && openKeyIter.hasNext()) {
+ kv = openKeyIter.next();
+ if (kv != null && kv.getKey().startsWith(dbOpenKeyPrefix)) {
+ String dbKey = kv.getKey();
+ long clientID = OMMetadataManager.getClientIDFromOpenKeyDBKey(dbKey);
+ OmKeyInfo omKeyInfo = kv.getValue();
+
+ // Trim client ID to get the keyTable dbKey
+ int lastSlashIdx = dbKey.lastIndexOf(OM_KEY_PREFIX);
+ String ktDbKey = dbKey.substring(0, lastSlashIdx);
+ // Check whether the key has been hsync'ed by checking keyTable
+ checkAndUpdateKeyHsyncStatus(omKeyInfo, ktDbKey, kTable);
+
+ openKeySessionList.add(
+ new OpenKeySession(clientID, omKeyInfo,
+ omKeyInfo.getLatestVersionLocations().getVersion()));
+ currentCount++;
+ }
+ }
+
+ // Set hasMore flag as a hint for client-side pagination
+ if (openKeyIter.hasNext()) {
+ KeyValue<String, OmKeyInfo> nextKv = openKeyIter.next();
+ hasMore = nextKv != null &&
nextKv.getKey().startsWith(dbOpenKeyPrefix);
+ } else {
+ hasMore = false;
+ }
+
+ // Set continuation token
+ retContToken = hasMore ? kv.getKey() : null;
+ }
+
+ return new ListOpenFilesResult(
+ getTotalOpenKeyCount(),
+ hasMore,
+ retContToken,
+ openKeySessionList);
+ }
+
+ /**
+ * Check and update OmKeyInfo from OpenKeyTable with hsync status in
KeyTable.
+ */
+ private void checkAndUpdateKeyHsyncStatus(OmKeyInfo omKeyInfo,
+ String dbKey,
+ Table<String, OmKeyInfo> kTable)
+ throws IOException {
+ OmKeyInfo ktOmKeyInfo = kTable.get(dbKey);
+ if (ktOmKeyInfo != null) {
+ // The same key in OpenKeyTable also exists in KeyTable, indicating
+ // the key has been hsync'ed
+ String hsyncClientId = ktOmKeyInfo.getMetadata().get(HSYNC_CLIENT_ID);
+ // Append HSYNC_CLIENT_ID to OmKeyInfo to be returned to the client
+ omKeyInfo.getMetadata().put(HSYNC_CLIENT_ID, hsyncClientId);
+ }
+ }
+
@Override
public ListKeysResult listKeys(String volumeName, String bucketName,
String startKey, String keyPrefix, int
maxKeys)
@@ -1732,6 +1835,13 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
return getMultipartInfoTable().isExist(multipartInfoDbKey);
}
+ @Override
+ public long getTotalOpenKeyCount() throws IOException {
+ // Get an estimated key count of OpenKeyTable + OpenFileTable
+ return openKeyTable.getEstimatedKeyCount()
+ + openFileTable.getEstimatedKeyCount();
+ }
+
@Override
public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
int count, BucketLayout bucketLayout) throws IOException {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 142b211b73..fe2b2e7332 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -43,6 +43,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.StringTokenizer;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
@@ -88,6 +89,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.OzoneManagerVersion;
+import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob;
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.hadoop.ozone.om.ratis_snapshot.OmRatisSnapshotProvider;
@@ -260,6 +262,7 @@ import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAU
import static org.apache.hadoop.ozone.OzoneConsts.DB_TRANSIENT_MARKER;
import static org.apache.hadoop.ozone.OzoneConsts.DEFAULT_OM_UPDATE_ID;
import static org.apache.hadoop.ozone.OzoneConsts.LAYOUT_VERSION_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_FILE;
import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_TEMP_FILE;
import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
@@ -298,7 +301,9 @@ import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DETE
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FEATURE_NOT_ENABLED;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INTERNAL_ERROR;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
+import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_PATH;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
+import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PERMISSION_DENIED;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
import static
org.apache.hadoop.ozone.om.s3.S3SecretStoreConfigurationKeys.DEFAULT_SECRET_STORAGE_TYPE;
@@ -3187,14 +3192,100 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
}
@Override
- public void transferLeadership(String newLeaderId)
+ public ListOpenFilesResult listOpenFiles(String path,
+ int maxKeys,
+ String contToken)
throws IOException {
- final UserGroupInformation ugi = getRemoteUser();
- if (!isAdmin(ugi)) {
- throw new OMException(
- "Only Ozone admins are allowed to transfer raft leadership.",
- PERMISSION_DENIED);
+
+ metrics.incNumListOpenFiles();
+ checkAdminUserPrivilege("list open files.");
+
+ // Using final to make sure they are assigned once and only once in
+ // every branch.
+ final String dbOpenKeyPrefix, dbContTokenPrefix;
+ final String volumeName, bucketName;
+ final BucketLayout bucketLayout;
+
+ // Process path prefix
+ if (path == null || path.isEmpty() || path.equals(OM_KEY_PREFIX)) {
+ // path is root
+ dbOpenKeyPrefix = "";
+ volumeName = "";
+ bucketName = "";
+ // default to FSO's OpenFileTable. TODO: client option to pass
OBS/LEGACY?
+ bucketLayout = BucketLayout.FILE_SYSTEM_OPTIMIZED;
+ } else {
+ // path is bucket or key prefix, break it down to volume, bucket, prefix
+ StringTokenizer tokenizer = new StringTokenizer(path, OM_KEY_PREFIX);
+ // Validate path to avoid NoSuchElementException
+ if (tokenizer.countTokens() < 2) {
+ metrics.incNumListOpenFilesFails();
+ throw new OMException("Invalid path: " + path + ". " +
+ "Only root level or bucket level path is supported at this time",
+ INVALID_PATH);
+ }
+
+ volumeName = tokenizer.nextToken();
+ bucketName = tokenizer.nextToken();
+
+ OmBucketInfo bucketInfo;
+ try {
+ // as expected, getBucketInfo throws if volume or bucket does not exist
+ bucketInfo = getBucketInfo(volumeName, bucketName);
+ } catch (OMException ex) {
+ metrics.incNumListOpenFilesFails();
+ throw ex;
+ } catch (IOException ex) {
+ // Wrap IOException in OMException
+ metrics.incNumListOpenFilesFails();
+ throw new OMException(ex.getMessage(), NOT_SUPPORTED_OPERATION);
+ }
+
+ final String keyPrefix;
+ if (tokenizer.hasMoreTokens()) {
+ // Collect the rest but trim the leading "/"
+ keyPrefix = tokenizer.nextToken("").substring(1);
+ } else {
+ keyPrefix = "";
+ }
+
+ // Determine dbKey prefix based on the bucket type
+ bucketLayout = bucketInfo.getBucketLayout();
+ switch (bucketLayout) {
+ case FILE_SYSTEM_OPTIMIZED:
+ dbOpenKeyPrefix = metadataManager.getOzoneKeyFSO(
+ volumeName, bucketName, keyPrefix);
+ break;
+ case OBJECT_STORE:
+ case LEGACY:
+ dbOpenKeyPrefix = metadataManager.getOzoneKey(
+ volumeName, bucketName, keyPrefix);
+ break;
+ default:
+ metrics.incNumListOpenFilesFails();
+ throw new OMException("Unsupported bucket layout: " +
+ bucketInfo.getBucketLayout(), NOT_SUPPORTED_OPERATION);
+ }
+ }
+
+ // Process cont. token
+ if (contToken == null || contToken.isEmpty()) {
+ // if a continuation token is not specified
+ dbContTokenPrefix = dbOpenKeyPrefix;
+ } else {
+ dbContTokenPrefix = contToken;
}
+
+ // arg processing done. call inner impl (table iteration)
+ return metadataManager.listOpenFiles(
+ bucketLayout, maxKeys, dbOpenKeyPrefix,
+ !StringUtils.isEmpty(contToken), dbContTokenPrefix);
+ }
+
+ @Override
+ public void transferLeadership(String newLeaderId)
+ throws IOException {
+ checkAdminUserPrivilege("transfer raft leadership.");
if (!isRatisEnabled) {
throw new IOException("OM HA not enabled.");
}
@@ -3311,7 +3402,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
if (!isAdmin(ugi)) {
final OMException omEx = new OMException(
"Only Ozone admins are allowed to list tenants.", PERMISSION_DENIED);
- AUDIT.logWriteFailure(buildAuditMessageForFailure(
+ AUDIT.logReadFailure(buildAuditMessageForFailure(
OMAction.LIST_TENANT, new LinkedHashMap<>(), omEx));
throw omEx;
}
@@ -4781,13 +4872,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
public String printCompactionLogDag(String fileNamePrefix,
String graphType)
throws IOException {
-
- final UserGroupInformation ugi = getRemoteUser();
- if (!isAdmin(ugi)) {
- throw new OMException(
- "Only Ozone admins are allowed to print compaction DAG.",
- PERMISSION_DENIED);
- }
+ checkAdminUserPrivilege("print compaction DAG.");
if (StringUtils.isBlank(fileNamePrefix)) {
fileNamePrefix = "dag-";
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index d96637b461..8918cf070b 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.ozone.om.helpers.ListKeysLightResult;
import org.apache.hadoop.ozone.om.helpers.ListKeysResult;
import org.apache.hadoop.ozone.om.helpers.DBUpdates;
import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
+import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -57,6 +58,7 @@ import
org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatusLight;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
@@ -80,6 +82,8 @@ import
org.apache.hadoop.ozone.om.upgrade.DisallowedUntilLayoutVersion;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelSnapshotDiffRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelSnapshotDiffResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListOpenFilesRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListOpenFilesResponse;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListSnapshotDiffJobRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListSnapshotDiffJobResponse;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessRequest;
@@ -249,6 +253,11 @@ public class OzoneManagerRequestHandler implements
RequestHandler {
listMultipartUploads(request.getListMultipartUploadsRequest());
responseBuilder.setListMultipartUploadsResponse(response);
break;
+ case ListOpenFiles:
+ ListOpenFilesResponse listOpenFilesResponse = listOpenFiles(
+ request.getListOpenFilesRequest(), request.getVersion());
+ responseBuilder.setListOpenFilesResponse(listOpenFilesResponse);
+ break;
case ServiceList:
ServiceListResponse serviceListResponse = getServiceList(
request.getServiceListRequest());
@@ -921,6 +930,31 @@ public class OzoneManagerRequestHandler implements
RequestHandler {
return resp;
}
+ private ListOpenFilesResponse listOpenFiles(ListOpenFilesRequest req,
+ int clientVersion)
+ throws IOException {
+ ListOpenFilesResponse.Builder resp = ListOpenFilesResponse.newBuilder();
+
+ ListOpenFilesResult res =
+ impl.listOpenFiles(req.getPath(), req.getCount(), req.getToken());
+ // TODO: Is there a clean way to avoid ser-de for responses:
+ // OM does: ListOpenFilesResult -> ListOpenFilesResponse
+ // Client : ListOpenFilesResponse -> ListOpenFilesResult
+
+ resp.setTotalOpenKeyCount(res.getTotalOpenKeyCount());
+ resp.setHasMore(res.hasMore());
+ if (res.getContinuationToken() != null) {
+ resp.setContinuationToken(res.getContinuationToken());
+ }
+
+ for (OpenKeySession e : res.getOpenKeys()) {
+ resp.addClientID(e.getId());
+ resp.addKeyInfo(e.getKeyInfo().getProtobuf(clientVersion));
+ }
+
+ return resp.build();
+ }
+
private ServiceListResponse getServiceList(ServiceListRequest request)
throws IOException {
ServiceListResponse.Builder resp = ServiceListResponse.newBuilder();
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
index e6debcdc23..451417ba3d 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
@@ -24,11 +24,13 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.hdds.utils.TransactionInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
@@ -44,11 +46,13 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
@@ -558,36 +562,120 @@ public class TestOmMetadataManager {
}
- private static BucketLayout getDefaultBucketLayout() {
- return BucketLayout.DEFAULT;
- }
+ /**
+ * Tests inner impl of listOpenFiles with different bucket types with and
+ * without pagination. NOTE: This UT does NOT test hsync here since the hsync
+ * status check is done purely on the client side.
+ * @param bucketLayout BucketLayout
+ */
+ @ParameterizedTest
+ @EnumSource
+ public void testListOpenFiles(BucketLayout bucketLayout) throws Exception {
+ final long clientID = 1000L;
- @Test
- public void testGetExpiredOpenKeys() throws Exception {
- testGetExpiredOpenKeys(BucketLayout.DEFAULT);
- }
+ String volumeName = "volume-lof";
+ String bucketName = "bucket-" + bucketLayout.name().toLowerCase();
+ String keyPrefix = "key";
- @Test
- public void testGetExpiredOpenKeysExcludeMPUs() throws Exception {
- testGetExpiredOpenKeysExcludeMPUKeys(BucketLayout.DEFAULT);
- }
+ OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+ omMetadataManager, bucketLayout);
- @Test
- public void testGetExpiredOpenKeysFSO() throws Exception {
- testGetExpiredOpenKeys(BucketLayout.FILE_SYSTEM_OPTIMIZED);
- }
+ long volumeId = -1L, bucketId = -1L;
+ if (bucketLayout.isFileSystemOptimized()) {
+ volumeId = omMetadataManager.getVolumeId(volumeName);
+ bucketId = omMetadataManager.getBucketId(volumeName, bucketName);
+ }
- @Test
- public void testGetExpiredOpenKeysExcludeMPUsFSO() throws Exception {
- testGetExpiredOpenKeysExcludeMPUKeys(BucketLayout.FILE_SYSTEM_OPTIMIZED);
+ int numOpenKeys = 3;
+ List<String> openKeys = new ArrayList<>();
+ for (int i = 0; i < numOpenKeys; i++) {
+ final OmKeyInfo keyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName,
+ bucketName, keyPrefix + i, HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE, 0L, Time.now());
+
+ final String dbOpenKeyName;
+ if (bucketLayout.isFileSystemOptimized()) {
+ keyInfo.setParentObjectID(i);
+ keyInfo.setFileName(OzoneFSUtils.getFileName(keyInfo.getKeyName()));
+ OMRequestTestUtils.addFileToKeyTable(true, false,
+ keyInfo.getFileName(), keyInfo, clientID, 0L, omMetadataManager);
+ dbOpenKeyName = omMetadataManager.getOpenFileName(volumeId, bucketId,
+ keyInfo.getParentObjectID(), keyInfo.getFileName(), clientID);
+ } else {
+ OMRequestTestUtils.addKeyToTable(true, false,
+ keyInfo, clientID, 0L, omMetadataManager);
+ dbOpenKeyName = omMetadataManager.getOpenKey(volumeName, bucketName,
+ keyInfo.getKeyName(), clientID);
+ }
+ openKeys.add(dbOpenKeyName);
+ }
+
+ String dbPrefix;
+ if (bucketLayout.isFileSystemOptimized()) {
+ dbPrefix = omMetadataManager.getOzoneKeyFSO(volumeName, bucketName, "");
+ } else {
+ dbPrefix = omMetadataManager.getOzoneKey(volumeName, bucketName, "");
+ }
+
+ // Without pagination
+ ListOpenFilesResult res = omMetadataManager.listOpenFiles(
+ bucketLayout, 100, dbPrefix, false, dbPrefix);
+
+ assertEquals(numOpenKeys, res.getTotalOpenKeyCount());
+ assertEquals(false, res.hasMore());
+ List<OpenKeySession> keySessionList = res.getOpenKeys();
+ assertEquals(numOpenKeys, keySessionList.size());
+ // Verify that every single open key shows up in the result, and in order
+ for (int i = 0; i < numOpenKeys; i++) {
+ OpenKeySession keySession = keySessionList.get(i);
+ assertEquals(keyPrefix + i, keySession.getKeyInfo().getKeyName());
+ assertEquals(clientID, keySession.getId());
+ assertEquals(0, keySession.getOpenVersion());
+ }
+
+ // With pagination
+ int pageSize = 2;
+ int numExpectedKeys = pageSize;
+ res = omMetadataManager.listOpenFiles(
+ bucketLayout, pageSize, dbPrefix, false, dbPrefix);
+ // total open key count should still be 3
+ assertEquals(numOpenKeys, res.getTotalOpenKeyCount());
+ // hasMore should have been set
+ assertEquals(true, res.hasMore());
+ keySessionList = res.getOpenKeys();
+ assertEquals(numExpectedKeys, keySessionList.size());
+ for (int i = 0; i < numExpectedKeys; i++) {
+ OpenKeySession keySession = keySessionList.get(i);
+ assertEquals(keyPrefix + i, keySession.getKeyInfo().getKeyName());
+ assertEquals(clientID, keySession.getId());
+ assertEquals(0, keySession.getOpenVersion());
+ }
+
+ // Get the second page
+ res = omMetadataManager.listOpenFiles(
+ bucketLayout, pageSize, dbPrefix, true, res.getContinuationToken());
+ numExpectedKeys = numOpenKeys - pageSize;
+ // total open key count should still be 3
+ assertEquals(numOpenKeys, res.getTotalOpenKeyCount());
+ assertEquals(false, res.hasMore());
+ keySessionList = res.getOpenKeys();
+ assertEquals(numExpectedKeys, keySessionList.size());
+ for (int i = 0; i < numExpectedKeys; i++) {
+ OpenKeySession keySession = keySessionList.get(i);
+ assertEquals(keyPrefix + (pageSize + i),
+ keySession.getKeyInfo().getKeyName());
+ assertEquals(clientID, keySession.getId());
+ assertEquals(0, keySession.getOpenVersion());
+ }
}
- @Test
- public void testGetExpiredMultipartUploads() throws Exception {
- testGetExpiredMPUs();
+ private static BucketLayout getDefaultBucketLayout() {
+ return BucketLayout.DEFAULT;
}
- private void testGetExpiredOpenKeys(BucketLayout bucketLayout)
+ @ParameterizedTest
+ @EnumSource
+ public void testGetExpiredOpenKeys(BucketLayout bucketLayout)
throws Exception {
final String bucketName = UUID.randomUUID().toString();
final String volumeName = UUID.randomUUID().toString();
@@ -665,7 +753,9 @@ public class TestOmMetadataManager {
assertThat(expiredKeys).containsAll(names);
}
- private void testGetExpiredOpenKeysExcludeMPUKeys(
+ @ParameterizedTest
+ @EnumSource
+ public void testGetExpiredOpenKeysExcludeMPUKeys(
BucketLayout bucketLayout) throws Exception {
final String bucketName = UUID.randomUUID().toString();
final String volumeName = UUID.randomUUID().toString();
@@ -753,7 +843,8 @@ public class TestOmMetadataManager {
.isEmpty());
}
- private void testGetExpiredMPUs() throws Exception {
+ @Test
+ public void testGetExpiredMPUs() throws Exception {
final String bucketName = UUID.randomUUID().toString();
final String volumeName = UUID.randomUUID().toString();
final int numExpiredMPUs = 4;
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/ListOpenFilesSubCommand.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/ListOpenFilesSubCommand.java
new file mode 100644
index 0000000000..9ede45a80a
--- /dev/null
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/ListOpenFilesSubCommand.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.admin.om;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.server.JsonUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import picocli.CommandLine;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/**
+ * Handler of ozone admin om list-open-files command.
+ */
[email protected](
+ name = "list-open-files",
+ aliases = {"list-open-keys", "lof", "lok"},
+ description = "Lists open files (keys) in Ozone Manager.",
+ mixinStandardHelpOptions = true,
+ versionProvider = HddsVersionProvider.class
+)
+public class ListOpenFilesSubCommand implements Callable<Void> {
+
+ @CommandLine.ParentCommand
+ private OMAdmin parent;
+
+ @CommandLine.Option(
+ names = {"--service-id", "--om-service-id"},
+ description = "Ozone Manager Service ID",
+ required = false
+ )
+ private String omServiceId;
+
+ @CommandLine.Option(
+ names = {"--service-host"},
+ description = "Ozone Manager Host. If OM HA is enabled, use --service-id
instead. "
+ + "If you must use --service-host with OM HA, this must point
directly to the leader OM. "
+ + "This option is required when --service-id is not provided or when
HA is not enabled."
+ )
+ private String omHost;
+
+ @CommandLine.Option(names = { "--json" },
+ defaultValue = "false",
+ description = "Format output as JSON")
+ private boolean json;
+
+ // Conforms to ListOptions, but not all in ListOptions applies here thus
+ // not using that directly
+ @CommandLine.Option(
+ names = {"-p", "--prefix"},
+ description = "Filter results by the specified path on the server side.",
+ defaultValue = "/"
+ )
+ private String pathPrefix;
+
+ @CommandLine.Option(
+ names = {"-l", "--length"},
+ description = "Maximum number of items to list",
+ defaultValue = "100"
+ )
+ private int limit;
+
+ @CommandLine.Option(
+ names = {"-s", "--start"},
+ description = "The item to start the listing from.\n" +
+ "i.e. continuation token. " +
+ "This will be excluded from the result.",
+ defaultValue = ""
+ )
+ private String startItem;
+
+ @Override
+ public Void call() throws Exception {
+
+ if (StringUtils.isEmpty(omServiceId) && StringUtils.isEmpty(omHost)) {
+ System.err.println("Error: Please specify -id or -host");
+ return null;
+ }
+
+ OzoneManagerProtocol ozoneManagerClient =
+ parent.createOmClient(omServiceId, omHost, false);
+
+ ListOpenFilesResult res =
+ ozoneManagerClient.listOpenFiles(pathPrefix, limit, startItem);
+
+ if (json) {
+ // Print detailed JSON
+ printOpenKeysListAsJson(res);
+ } else {
+ // Human friendly output
+ printOpenKeysList(res);
+ }
+
+ return null;
+ }
+
+ private void printOpenKeysListAsJson(ListOpenFilesResult res)
+ throws IOException {
+ System.out.println(JsonUtils.toJsonStringWithDefaultPrettyPrinter(res));
+ }
+
+ private void printOpenKeysList(ListOpenFilesResult res) {
+
+ List<OpenKeySession> openFileList = res.getOpenKeys();
+
+ String msg = res.getTotalOpenKeyCount() +
+ " total open files (est.). Showing " + openFileList.size() +
+ " open files (limit " + limit + ") under path prefix:\n " +
pathPrefix;
+
+ if (startItem != null && !startItem.isEmpty()) {
+ msg += "\nafter continuation token:\n " + startItem;
+ }
+ msg += "\n\nClient ID\t\tCreation time\tHsync'ed\tOpen File Path";
+ System.out.println(msg);
+
+ for (OpenKeySession e : openFileList) {
+ long clientId = e.getId();
+ OmKeyInfo omKeyInfo = e.getKeyInfo();
+ String line = clientId + "\t" + omKeyInfo.getCreationTime() + "\t";
+
+ if (omKeyInfo.isHsync()) {
+ String hsyncClientIdStr =
+ omKeyInfo.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID);
+ long hsyncClientId = Long.parseLong(hsyncClientIdStr);
+ if (clientId == hsyncClientId) {
+ line += "Yes\t\t";
+ } else {
+ // last hsync'ed with a different client ID than the client that
+ // initially opens the file (!)
+ line += "Yes w/ cid " + hsyncClientIdStr + "\t";
+ }
+ } else {
+ line += "No\t\t";
+ }
+
+ line += getFullPathFromKeyInfo(omKeyInfo);
+
+ System.out.println(line);
+ }
+
+ // Compose next batch's command
+ if (res.hasMore()) {
+ String nextBatchCmd = getCmdForNextBatch(res.getContinuationToken());
+
+ System.out.println("\n" +
+ "To get the next batch of open keys, run:\n " + nextBatchCmd);
+ } else {
+ System.out.println("\nReached the end of the list.");
+ }
+ }
+
+ /**
+ * @return the command to get the next batch of open keys
+ */
+ private String getCmdForNextBatch(String lastElementFullPath) {
+ String nextBatchCmd = "ozone admin om lof";
+ if (omServiceId != null && !omServiceId.isEmpty()) {
+ nextBatchCmd += " -id=" + omServiceId;
+ }
+ if (omHost != null && !omHost.isEmpty()) {
+ nextBatchCmd += " -host=" + omHost;
+ }
+ if (json) {
+ nextBatchCmd += " --json";
+ }
+ nextBatchCmd += " --length=" + limit;
+ if (pathPrefix != null && !pathPrefix.isEmpty()) {
+ nextBatchCmd += " --prefix=" + pathPrefix;
+ }
+ nextBatchCmd += " --start=" + lastElementFullPath;
+ return nextBatchCmd;
+ }
+
+ private String getFullPathFromKeyInfo(OmKeyInfo oki) {
+ return "/" + oki.getVolumeName() +
+ "/" + oki.getBucketName() +
+ "/" + oki.getPath();
+ }
+
+}
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java
index ce7d4ed7a7..3162c55635 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java
@@ -53,6 +53,7 @@ import java.util.Collection;
versionProvider = HddsVersionProvider.class,
subcommands = {
FinalizeUpgradeSubCommand.class,
+ ListOpenFilesSubCommand.class,
GetServiceRolesSubcommand.class,
PrepareSubCommand.class,
CancelPrepareSubCommand.class,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]