This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-7593 by this push:
new 7828ba3ea2 HDDS-11260. [hsync] Add Ozone Manager protocol version
(#7015)
7828ba3ea2 is described below
commit 7828ba3ea23885928e783051c4e367d4f19db15e
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Fri Aug 2 09:28:25 2024 -0700
HDDS-11260. [hsync] Add Ozone Manager protocol version (#7015)
---
.../apache/hadoop/ozone/OzoneManagerVersion.java | 1 +
.../hadoop/ozone/client/io/KeyOutputStream.java | 18 ++++++++++++++
.../ozone/client/protocol/ClientProtocol.java | 24 +++++++++++++++++++
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 28 ++++++++++++++++++++--
.../fs/ozone/BasicOzoneClientAdapterImpl.java | 8 ++++---
.../ozone/BasicRootedOzoneClientAdapterImpl.java | 6 +++--
.../hadoop/ozone/client/ClientProtocolStub.java | 13 ++++++++++
.../ozone/admin/om/ListOpenFilesSubCommand.java | 10 ++++++++
8 files changed, 101 insertions(+), 7 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
index c53aca2f15..eec2ceeb5e 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
@@ -43,6 +43,7 @@ public enum OzoneManagerVersion implements ComponentVersion {
OBJECT_TAG(5, "OzoneManager version that supports object tags"),
ATOMIC_REWRITE_KEY(6, "OzoneManager version that supports rewriting key as
atomic operation"),
+ HBASE_SUPPORT(7, "OzoneManager version that supports HBase integration"),
FUTURE_VERSION(-1, "Used internally in the client when the server side is "
+ " newer and an unknown server version has arrived to the client.");
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 649c2e7dcc..2e44539826 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.OzoneManagerVersion;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
@@ -107,6 +108,7 @@ public class KeyOutputStream extends OutputStream
*/
private boolean atomicKeyCreation;
private ContainerClientMetrics clientMetrics;
+ private OzoneManagerVersion ozoneManagerVersion;
public KeyOutputStream(ReplicationConfig replicationConfig,
BlockOutputStreamEntryPool blockOutputStreamEntryPool) {
this.replication = replicationConfig;
@@ -157,6 +159,7 @@ public class KeyOutputStream extends OutputStream
this.atomicKeyCreation = b.getAtomicKeyCreation();
this.streamBufferArgs = b.getStreamBufferArgs();
this.clientMetrics = b.getClientMetrics();
+ this.ozoneManagerVersion = b.ozoneManagerVersion;
}
/**
@@ -457,6 +460,11 @@ public class KeyOutputStream extends OutputStream
throw new UnsupportedOperationException("The replication factor = "
+ replication.getRequiredNodes() + " <= 1");
}
+ if (ozoneManagerVersion.compareTo(OzoneManagerVersion.HBASE_SUPPORT) < 0) {
+ throw new UnsupportedOperationException("Hsync API requires OM version "
+ + OzoneManagerVersion.HBASE_SUPPORT + " or later. Current OM version
"
+ + ozoneManagerVersion);
+ }
checkNotClosed();
final long hsyncPos = writeOffset;
@@ -599,6 +607,7 @@ public class KeyOutputStream extends OutputStream
private boolean atomicKeyCreation = false;
private StreamBufferArgs streamBufferArgs;
private Supplier<ExecutorService> executorServiceSupplier;
+ private OzoneManagerVersion ozoneManagerVersion;
public String getMultipartUploadID() {
return multipartUploadID;
@@ -721,6 +730,15 @@ public class KeyOutputStream extends OutputStream
return executorServiceSupplier;
}
+ public Builder setOmVersion(OzoneManagerVersion omVersion) {
+ this.ozoneManagerVersion = omVersion;
+ return this;
+ }
+
+ public OzoneManagerVersion getOmVersion() {
+ return ozoneManagerVersion;
+ }
+
public KeyOutputStream build() {
return new KeyOutputStream(this);
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 55985a18d3..68812a7eb4 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -50,6 +50,8 @@ import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.DeleteTenantState;
import org.apache.hadoop.ozone.om.helpers.ErrorInfo;
+import org.apache.hadoop.ozone.om.helpers.LeaseKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
@@ -1313,4 +1315,26 @@ public interface ClientProtocol {
* */
void setTimes(OzoneObj obj, String keyName, long mtime, long atime)
throws IOException;
+
+ /**
+ * Start the lease recovery of a file.
+ *
+ * @param volumeName - The volume name.
+ * @param bucketName - The bucket name.
+ * @param keyName - The key user want to recover.
+ * @param force - force recover the file.
+ * @return LeaseKeyInfo KeyInfo of file under recovery
+ * @throws IOException if an error occurs
+ */
+ LeaseKeyInfo recoverLease(String volumeName, String bucketName, String
keyName, boolean force) throws IOException;
+
+ /**
+ * Recovery and commit a key. This will make the change from the client
visible. The client
+ * is identified by the clientID.
+ *
+ * @param args the key to commit
+ * @param clientID the client identification
+ * @throws IOException
+ */
+ void recoverKey(OmKeyArgs args, long clientID) throws IOException;
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index ac0cf1d09c..1bad03c7d2 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -100,6 +100,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.DeleteTenantState;
import org.apache.hadoop.ozone.om.helpers.ErrorInfo;
import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
+import org.apache.hadoop.ozone.om.helpers.LeaseKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDeleteKeys;
@@ -336,7 +337,7 @@ public class RpcClient implements ClientProtocol {
return xceiverClientManager;
}
- private OzoneManagerVersion getOmVersion(ServiceInfoEx info) {
+ public static OzoneManagerVersion getOmVersion(ServiceInfoEx info) {
OzoneManagerVersion version = OzoneManagerVersion.CURRENT;
for (ServiceInfo si : info.getServiceInfoList()) {
if (si.getNodeType() == HddsProtos.NodeType.OM) {
@@ -2562,7 +2563,8 @@ public class RpcClient implements ClientProtocol {
.setAtomicKeyCreation(isS3GRequest.get())
.setClientMetrics(clientMetrics)
.setExecutorServiceSupplier(writeExecutor)
- .setStreamBufferArgs(streamBufferArgs);
+ .setStreamBufferArgs(streamBufferArgs)
+ .setOmVersion(omVersion);
}
@Override
@@ -2684,6 +2686,28 @@ public class RpcClient implements ClientProtocol {
ozoneManagerClient.setTimes(builder.build(), mtime, atime);
}
+ @Override
+ public LeaseKeyInfo recoverLease(String volumeName, String bucketName,
+ String keyName, boolean force)
+ throws IOException {
+ if (omVersion.compareTo(OzoneManagerVersion.HBASE_SUPPORT) < 0) {
+ throw new UnsupportedOperationException("Lease recovery API requires OM
version "
+ + OzoneManagerVersion.HBASE_SUPPORT + " or later. Current OM version
"
+ + omVersion);
+ }
+ return ozoneManagerClient.recoverLease(volumeName, bucketName, keyName,
force);
+ }
+
+ @Override
+ public void recoverKey(OmKeyArgs args, long clientID) throws IOException {
+ if (omVersion.compareTo(OzoneManagerVersion.HBASE_SUPPORT) < 0) {
+ throw new UnsupportedOperationException("Lease recovery API requires OM
version "
+ + OzoneManagerVersion.HBASE_SUPPORT + " or later. Current OM version
"
+ + omVersion);
+ }
+ ozoneManagerClient.recoverKey(args, clientID);
+ }
+
private static ExecutorService createThreadPoolExecutor(
int corePoolSize, int maximumPoolSize, String threadNameFormat) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index e4df4c242b..658685779e 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.client.rpc.RpcClient;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -703,8 +704,8 @@ public class BasicOzoneClientAdapterImpl implements
OzoneClientAdapter {
incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1);
try {
- return ozoneClient.getProxy().getOzoneManagerClient().recoverLease(
- volume.getName(), bucket.getName(), pathStr, force);
+ ClientProtocol clientProtocol = ozoneClient.getProxy();
+ return clientProtocol.recoverLease(volume.getName(), bucket.getName(),
pathStr, force);
} catch (OMException ome) {
if (ome.getResult() == NOT_A_FILE) {
throw new FileNotFoundException("Path is not a file. " +
ome.getMessage());
@@ -720,7 +721,8 @@ public class BasicOzoneClientAdapterImpl implements
OzoneClientAdapter {
public void recoverFile(OmKeyArgs keyArgs) throws IOException {
incrementCounter(Statistic.INVOCATION_RECOVER_FILE, 1);
- ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L);
+ ClientProtocol clientProtocol = ozoneClient.getProxy();
+ clientProtocol.recoverKey(keyArgs, 0L);
}
@Override
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index ab05dd69c3..76533e7774 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -1395,7 +1395,8 @@ public class BasicRootedOzoneClientAdapterImpl
try {
OzoneBucket bucket = getBucket(ofsPath, false);
- return ozoneClient.getProxy().getOzoneManagerClient().recoverLease(
+ ClientProtocol clientProtocol = ozoneClient.getProxy();
+ return clientProtocol.recoverLease(
bucket.getVolumeName(), bucket.getName(), ofsPath.getKeyName(),
force);
} catch (OMException ome) {
if (ome.getResult() == NOT_A_FILE) {
@@ -1414,7 +1415,8 @@ public class BasicRootedOzoneClientAdapterImpl
public void recoverFile(OmKeyArgs keyArgs) throws IOException {
incrementCounter(Statistic.INVOCATION_RECOVER_FILE, 1);
- ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L);
+ ClientProtocol clientProtocol = ozoneClient.getProxy();
+ clientProtocol.recoverKey(keyArgs, 0L);
}
@Override
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
index bea831a065..21c3f8358f 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.om.helpers.DeleteTenantState;
import org.apache.hadoop.ozone.om.helpers.ErrorInfo;
+import org.apache.hadoop.ozone.om.helpers.LeaseKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
@@ -768,4 +770,15 @@ public class ClientProtocolStub implements ClientProtocol {
throws IOException {
}
+ @Override
+ public LeaseKeyInfo recoverLease(String volumeName, String bucketName,
+ String keyName, boolean force) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void recoverKey(OmKeyArgs args, long clientID) throws IOException {
+
+ }
+
}
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/ListOpenFilesSubCommand.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/ListOpenFilesSubCommand.java
index 723a4ec402..221f9f9a1c 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/ListOpenFilesSubCommand.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/ListOpenFilesSubCommand.java
@@ -21,9 +21,12 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.server.JsonUtils;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.OzoneManagerVersion;
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import picocli.CommandLine;
@@ -112,6 +115,13 @@ public class ListOpenFilesSubCommand implements
Callable<Void> {
OzoneManagerProtocol ozoneManagerClient =
parent.createOmClient(omServiceId, omHost, false);
+ ServiceInfoEx serviceInfoEx = ozoneManagerClient.getServiceInfo();
+ final OzoneManagerVersion omVersion =
RpcClient.getOmVersion(serviceInfoEx);
+ if (omVersion.compareTo(OzoneManagerVersion.HBASE_SUPPORT) < 0) {
+ System.err.println("Error: This command requires OzoneManager version "
+ + OzoneManagerVersion.HBASE_SUPPORT.name() + " or later.");
+ return null;
+ }
ListOpenFilesResult res =
ozoneManagerClient.listOpenFiles(pathPrefix, limit, startItem);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]