This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-10656-atomic-key-overwrite
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to 
refs/heads/HDDS-10656-atomic-key-overwrite by this push:
     new 51466d7f75 HDDS-10527. Rewrite key atomically (#6385)
51466d7f75 is described below

commit 51466d7f754c3c41baffec8c0d77bec3f2fc4270
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Wed May 15 10:48:12 2024 +0100

    HDDS-10527. Rewrite key atomically (#6385)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   1 +
 .../apache/hadoop/ozone/client/OzoneBucket.java    |  25 +++-
 .../org/apache/hadoop/ozone/client/OzoneKey.java   |   8 +-
 .../hadoop/ozone/client/OzoneKeyDetails.java       |  30 ++++-
 .../ozone/client/protocol/ClientProtocol.java      |  23 ++++
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  77 +++++++++----
 .../apache/hadoop/ozone/om/helpers/OmKeyArgs.java  |  34 +++++-
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  |  40 +++++++
 ...OzoneManagerProtocolClientSideTranslatorPB.java |   4 +
 .../hadoop/ozone/om/helpers/TestOmKeyInfo.java     |   2 +
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  50 ++++++++
 .../src/main/proto/OmClientProtocol.proto          |  14 +++
 .../hadoop/ozone/om/request/RequestAuditor.java    |   4 +
 .../ozone/om/request/key/OMKeyCommitRequest.java   |  24 ++++
 .../ozone/om/request/key/OMKeyCreateRequest.java   |  14 +++
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |   4 +-
 .../om/request/key/TestOMKeyCommitRequest.java     |  87 +++++++++++++-
 .../om/request/key/TestOMKeyCreateRequest.java     | 128 ++++++++++++++++++---
 .../request/key/TestOMKeyCreateRequestWithFSO.java |   3 +-
 .../hadoop/ozone/client/ClientProtocolStub.java    |   8 ++
 .../hadoop/ozone/client/OzoneBucketStub.java       |  32 ++++++
 21 files changed, 558 insertions(+), 54 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index f3c08b252b..e29aeac1e1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -343,6 +343,7 @@ public final class OzoneConsts {
   public static final String BUCKET_LAYOUT = "bucketLayout";
   public static final String TENANT = "tenant";
   public static final String USER_PREFIX = "userPrefix";
+  public static final String REWRITE_GENERATION = "rewriteGeneration";
 
   // For multi-tenancy
   public static final String TENANT_ID_USERNAME_DELIMITER = "$";
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index 8d153a948c..012f029f51 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -470,6 +470,28 @@ public class OzoneBucket extends WithMetadata {
         .createKey(volumeName, name, key, size, replicationConfig, 
keyMetadata);
   }
 
+  /**
+   * This API allows to atomically update an existing key. The key read before 
invoking this API
+   * should remain unchanged for this key to be written. This is controlled by 
the generation
+   * field in the existing Key param. If the key is replaced or updated the 
generation will change. If the
+   * generation has changed since the existing Key was read, either the 
initial key create will fail,
+   * or the key will fail to commit after the data has been written as the 
checks are carried out
+   * both at key open and commit time.
+   *
+   * @param keyName Existing key to rewrite. This must exist in the bucket.
+   * @param size The size of the new key
+   * @param existingKeyGeneration The generation of the existing key which is 
checked for changes at key create
+   *                              and commit time.
+   * @param replicationConfig The replication configuration for the key to be 
rewritten.
+   * @param metadata custom key value metadata
+   * @return OzoneOutputStream to which the data has to be written.
+   * @throws IOException
+   */
+  public OzoneOutputStream rewriteKey(String keyName, long size, long 
existingKeyGeneration,
+      ReplicationConfig replicationConfig, Map<String, String> metadata) 
throws IOException {
+    return proxy.rewriteKey(volumeName, name, keyName, size, 
existingKeyGeneration, replicationConfig, metadata);
+  }
+
   /**
    * Creates a new key in the bucket, with default replication type RATIS and
    * with replication factor THREE.
@@ -1784,8 +1806,7 @@ public class OzoneBucket extends WithMetadata {
             keyInfo.getDataSize(), keyInfo.getCreationTime(),
             keyInfo.getModificationTime(),
             keyInfo.getReplicationConfig(),
-            keyInfo.isFile(),
-            keyInfo.getOwnerName());
+            keyInfo.isFile(), keyInfo.getOwnerName());
         keysResultList.add(ozoneKey);
       }
     }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
index 3663f6f654..2d32a72720 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKey.java
@@ -70,10 +70,6 @@ public class OzoneKey {
    */
   private final boolean isFile;
 
-  /**
-   * Constructs OzoneKey from OmKeyInfo.
-   *
-   */
   @SuppressWarnings("parameternumber")
   public OzoneKey(String volumeName, String bucketName,
       String keyName, long size, long creationTime,
@@ -201,6 +197,10 @@ public class OzoneKey {
     return isFile;
   }
 
+  /**
+   * Constructs OzoneKey from OmKeyInfo.
+   *
+   */
   public static OzoneKey fromKeyInfo(OmKeyInfo keyInfo) {
     return new OzoneKey(keyInfo.getVolumeName(), keyInfo.getBucketName(),
         keyInfo.getKeyName(), keyInfo.getDataSize(), keyInfo.getCreationTime(),
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
index 6b44fa1dca..cd2978fce1 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
@@ -42,6 +42,12 @@ public class OzoneKeyDetails extends OzoneKey {
 
   private final CheckedSupplier<OzoneInputStream, IOException> contentSupplier;
 
+  /**
+   * The generation of an existing key. This can be used with atomic commits, 
to
+   * ensure the key has not changed since the key details were read.
+   */
+  private final Long generation;
+
   /**
    * Constructs OzoneKeyDetails from OmKeyInfo.
    */
@@ -53,12 +59,30 @@ public class OzoneKeyDetails extends OzoneKey {
       Map<String, String> metadata,
       FileEncryptionInfo feInfo,
       CheckedSupplier<OzoneInputStream, IOException> contentSupplier,
-      boolean isFile, String owner) {
+      boolean isFile, String owner, Long generation) {
     super(volumeName, bucketName, keyName, size, creationTime,
         modificationTime, replicationConfig, metadata, isFile, owner);
     this.ozoneKeyLocations = ozoneKeyLocations;
     this.feInfo = feInfo;
     this.contentSupplier = contentSupplier;
+    this.generation = generation;
+  }
+
+  /**
+   * Constructs OzoneKeyDetails from OmKeyInfo.
+   */
+  @SuppressWarnings("parameternumber")
+  public OzoneKeyDetails(String volumeName, String bucketName, String keyName,
+                         long size, long creationTime, long modificationTime,
+                         List<OzoneKeyLocation> ozoneKeyLocations,
+                         ReplicationConfig replicationConfig,
+                         Map<String, String> metadata,
+                         FileEncryptionInfo feInfo,
+                         CheckedSupplier<OzoneInputStream, IOException> 
contentSupplier,
+                         boolean isFile, String owner) {
+    this(volumeName, bucketName, keyName, size, creationTime,
+        modificationTime, ozoneKeyLocations, replicationConfig, metadata, 
feInfo, contentSupplier,
+        isFile, owner, null);
   }
 
   /**
@@ -72,6 +96,10 @@ public class OzoneKeyDetails extends OzoneKey {
     return feInfo;
   }
 
+  public Long getGeneration() {
+    return generation;
+  }
+
   /**
    * Get OzoneInputStream to read the content of the key.
    * @return OzoneInputStream
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 912a3138c4..0725b4f253 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -353,6 +353,29 @@ public interface ClientProtocol {
       Map<String, String> metadata)
       throws IOException;
 
+  /**
+   * This API allows to atomically update an existing key. The key read before 
invoking this API
+   * should remain unchanged for this key to be written. This is controlled by 
the generation
+   * field in the existing Key param. If the key is replaced or updated the 
generation will change. If the
+   * generation has changed since the existing Key was read, either the 
initial key create will fail,
+   * or the key will fail to commit after the data has been written as the 
checks are carried out
+   * both at key open and commit time.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param keyName Existing key to rewrite. This must exist in the bucket.
+   * @param size The size of the new key
+   * @param existingKeyGeneration The generation of the existing key which is 
checked for changes at key create
+   *                              and commit time.
+   * @param replicationConfig The replication configuration for the key to be 
rewritten.
+   * @param metadata custom key value metadata
+   * @return {@link OzoneOutputStream}
+   * @throws IOException
+   */
+  OzoneOutputStream rewriteKey(String volumeName, String bucketName, String 
keyName,
+      long size, long existingKeyGeneration, ReplicationConfig 
replicationConfig,
+       Map<String, String> metadata) throws IOException;
+
   /**
    * Writes a key in an existing bucket.
    * @param volumeName Name of the Volume
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 2f97f2f3cc..000a194150 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -1387,26 +1387,7 @@ public class RpcClient implements ClientProtocol {
       ReplicationConfig replicationConfig,
       Map<String, String> metadata)
       throws IOException {
-    verifyVolumeName(volumeName);
-    verifyBucketName(bucketName);
-    if (checkKeyNameEnabled) {
-      HddsClientUtils.verifyKeyName(keyName);
-    }
-    HddsClientUtils.checkNotNull(keyName);
-    if (omVersion
-        .compareTo(OzoneManagerVersion.ERASURE_CODED_STORAGE_SUPPORT) < 0) {
-      if (replicationConfig != null &&
-          replicationConfig.getReplicationType()
-              == HddsProtos.ReplicationType.EC) {
-        throw new IOException("Can not set the replication of the key to"
-            + " Erasure Coded replication, as OzoneManager does not support"
-            + " Erasure Coded replication.");
-      }
-    }
-
-    if (replicationConfig != null) {
-      replicationConfigValidator.validate(replicationConfig);
-    }
+    createKeyPreChecks(volumeName, bucketName, keyName, replicationConfig);
     String ownerName = getRealUserInfo().getShortUserName();
 
     OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
@@ -1431,6 +1412,59 @@ public class RpcClient implements ClientProtocol {
     return createOutputStream(openKey);
   }
 
+  @Override
+  public OzoneOutputStream rewriteKey(String volumeName, String bucketName, 
String keyName,
+      long size, long existingKeyGeneration, ReplicationConfig 
replicationConfig,
+      Map<String, String> metadata) throws IOException {
+    createKeyPreChecks(volumeName, bucketName, keyName, replicationConfig);
+    String ownerName = getRealUserInfo().getShortUserName();
+
+    OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setDataSize(size)
+        .setReplicationConfig(replicationConfig)
+        .addAllMetadataGdpr(metadata)
+        .setLatestVersionLocation(getLatestVersionLocation)
+        .setOwnerName(ownerName)
+        .setRewriteGeneration(existingKeyGeneration);
+
+    OpenKeySession openKey = ozoneManagerClient.openKey(builder.build());
+    // For bucket with layout OBJECT_STORE, when create an empty file (size=0),
+    // OM will set DataSize to OzoneConfigKeys#OZONE_SCM_BLOCK_SIZE,
+    // which will cause S3G's atomic write length check to fail,
+    // so reset size to 0 here.
+    if (isS3GRequest.get() && size == 0) {
+      openKey.getKeyInfo().setDataSize(0);
+    }
+    return createOutputStream(openKey);
+  }
+
+  private void createKeyPreChecks(String volumeName, String bucketName, String 
keyName,
+      ReplicationConfig replicationConfig) throws IOException {
+    verifyVolumeName(volumeName);
+    verifyBucketName(bucketName);
+    if (checkKeyNameEnabled) {
+      HddsClientUtils.verifyKeyName(keyName);
+    }
+    HddsClientUtils.checkNotNull(keyName);
+    if (omVersion
+        .compareTo(OzoneManagerVersion.ERASURE_CODED_STORAGE_SUPPORT) < 0) {
+      if (replicationConfig != null &&
+          replicationConfig.getReplicationType()
+              == HddsProtos.ReplicationType.EC) {
+        throw new IOException("Can not set the replication of the key to"
+            + " Erasure Coded replication, as OzoneManager does not support"
+            + " Erasure Coded replication.");
+      }
+    }
+
+    if (replicationConfig != null) {
+      replicationConfigValidator.validate(replicationConfig);
+    }
+  }
+
   @Override
   public OzoneDataStreamOutput createStreamKey(
       String volumeName, String bucketName, String keyName, long size,
@@ -1722,7 +1756,8 @@ public class RpcClient implements ClientProtocol {
         keyInfo.getModificationTime(), ozoneKeyLocations,
         keyInfo.getReplicationConfig(), keyInfo.getMetadata(),
         keyInfo.getFileEncryptionInfo(),
-        () -> getInputStreamWithRetryFunction(keyInfo), keyInfo.isFile(), 
keyInfo.getOwnerName());
+        () -> getInputStreamWithRetryFunction(keyInfo), keyInfo.isFile(), 
keyInfo.getOwnerName(),
+        keyInfo.getGeneration());
   }
 
   @Override
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
index e8ad2564f3..35417c0b32 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
@@ -53,6 +53,13 @@ public final class OmKeyArgs implements Auditable {
   private final boolean recursive;
   private final boolean headOp;
   private final boolean forceUpdateContainerCacheFromSCM;
+  // RewriteGeneration, when used in key creation indicates that a
+  // key with the same keyName should exist with the given generation.
+  // For a key commit to succeed, the original key should still be present 
with the
+  // generation unchanged.
+  // This allows a key to be created an committed atomically if the original 
has not
+  // been modified.
+  private Long rewriteGeneration = null;
 
   private OmKeyArgs(Builder b) {
     this.volumeName = b.volumeName;
@@ -72,6 +79,7 @@ public final class OmKeyArgs implements Auditable {
     this.headOp = b.headOp;
     this.forceUpdateContainerCacheFromSCM = b.forceUpdateContainerCacheFromSCM;
     this.ownerName = b.ownerName;
+    this.rewriteGeneration = b.rewriteGeneration;
   }
 
   public boolean getIsMultipartKey() {
@@ -150,6 +158,10 @@ public final class OmKeyArgs implements Auditable {
     return forceUpdateContainerCacheFromSCM;
   }
 
+  public Long getRewriteGeneration() {
+    return rewriteGeneration;
+  }
+
   @Override
   public Map<String, String> toAuditMap() {
     Map<String, String> auditMap = new LinkedHashMap<>();
@@ -173,7 +185,7 @@ public final class OmKeyArgs implements Auditable {
   }
 
   public OmKeyArgs.Builder toBuilder() {
-    return new OmKeyArgs.Builder()
+    OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
         .setKeyName(keyName)
@@ -190,11 +202,16 @@ public final class OmKeyArgs implements Auditable {
         .setLatestVersionLocation(latestVersionLocation)
         .setAcls(acls)
         .setForceUpdateContainerCacheFromSCM(forceUpdateContainerCacheFromSCM);
+
+    if (rewriteGeneration != null) {
+      builder.setRewriteGeneration(rewriteGeneration);
+    }
+    return builder;
   }
 
   @Nonnull
   public KeyArgs toProtobuf() {
-    return KeyArgs.newBuilder()
+    KeyArgs.Builder builder = KeyArgs.newBuilder()
         .setVolumeName(getVolumeName())
         .setBucketName(getBucketName())
         .setKeyName(getKeyName())
@@ -203,8 +220,11 @@ public final class OmKeyArgs implements Auditable {
         .setLatestVersionLocation(getLatestVersionLocation())
         .setHeadOp(isHeadOp())
         .setForceUpdateContainerCacheFromSCM(
-            isForceUpdateContainerCacheFromSCM())
-        .build();
+            isForceUpdateContainerCacheFromSCM());
+    if (rewriteGeneration != null) {
+      builder.setRewriteGeneration(rewriteGeneration);
+    }
+    return builder.build();
   }
 
   /**
@@ -228,6 +248,7 @@ public final class OmKeyArgs implements Auditable {
     private boolean recursive;
     private boolean headOp;
     private boolean forceUpdateContainerCacheFromSCM;
+    private Long rewriteGeneration = null;
 
     public Builder setVolumeName(String volume) {
       this.volumeName = volume;
@@ -327,6 +348,11 @@ public final class OmKeyArgs implements Auditable {
       return this;
     }
 
+    public Builder setRewriteGeneration(long generation) {
+      this.rewriteGeneration = generation;
+      return this;
+    }
+
     public OmKeyArgs build() {
       return new OmKeyArgs(this);
     }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index bf31be67c5..84a75d09e5 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -102,6 +102,14 @@ public final class OmKeyInfo extends WithParentObjectId
    */
   private final CopyOnWriteArrayList<OzoneAcl> acls;
 
+  // rewriteGeneration, when used in key creation indicates that a
+  // key with the same keyName should exist with the given generation.
+  // For a key commit to succeed, the original key should still be present 
with the
+  // generation unchanged.
+  // This allows a key to be created an committed atomically if the original 
has not
+  // been modified.
+  private Long rewriteGeneration = null;
+
   private OmKeyInfo(Builder b) {
     super(b);
     this.volumeName = b.volumeName;
@@ -118,6 +126,7 @@ public final class OmKeyInfo extends WithParentObjectId
     this.fileName = b.fileName;
     this.isFile = b.isFile;
     this.ownerName = b.ownerName;
+    this.rewriteGeneration = b.rewriteGeneration;
   }
 
   public String getVolumeName() {
@@ -160,10 +169,26 @@ public final class OmKeyInfo extends WithParentObjectId
     return fileName;
   }
 
+  public void setRewriteGeneration(Long generation) {
+    this.rewriteGeneration = generation;
+  }
+
+  public Long getRewriteGeneration() {
+    return rewriteGeneration;
+  }
+
   public String getOwnerName() {
     return ownerName;
   }
 
+  /**
+   * Returns the generation of the object. Note this is currently the same as 
updateID for a key.
+   * @return long
+   */
+  public long getGeneration() {
+    return getUpdateID();
+  }
+
   public synchronized OmKeyLocationInfoGroup getLatestVersionLocations() {
     return keyLocationVersions.size() == 0 ? null :
         keyLocationVersions.get(keyLocationVersions.size() - 1);
@@ -435,6 +460,7 @@ public final class OmKeyInfo extends WithParentObjectId
     private FileChecksum fileChecksum;
 
     private boolean isFile;
+    private Long rewriteGeneration = null;
 
     public Builder() {
     }
@@ -563,6 +589,11 @@ public final class OmKeyInfo extends WithParentObjectId
       return this;
     }
 
+    public Builder setRewriteGeneration(Long existingGeneration) {
+      this.rewriteGeneration = existingGeneration;
+      return this;
+    }
+
     public OmKeyInfo build() {
       return new OmKeyInfo(this);
     }
@@ -667,6 +698,9 @@ public final class OmKeyInfo extends WithParentObjectId
       kb.setFileEncryptionInfo(OMPBHelper.convert(encInfo));
     }
     kb.setIsFile(isFile);
+    if (rewriteGeneration != null) {
+      kb.setRewriteGeneration(rewriteGeneration);
+    }
     if (ownerName != null) {
       kb.setOwnerName(ownerName);
     }
@@ -716,6 +750,9 @@ public final class OmKeyInfo extends WithParentObjectId
     if (keyInfo.hasIsFile()) {
       builder.setFile(keyInfo.getIsFile());
     }
+    if (keyInfo.hasRewriteGeneration()) {
+      builder.setRewriteGeneration(keyInfo.getRewriteGeneration());
+    }
 
     if (keyInfo.hasOwnerName()) {
       builder.setOwnerName(keyInfo.getOwnerName());
@@ -830,6 +867,9 @@ public final class OmKeyInfo extends WithParentObjectId
     if (fileChecksum != null) {
       builder.setFileChecksum(fileChecksum);
     }
+    if (rewriteGeneration != null) {
+      builder.setRewriteGeneration(rewriteGeneration);
+    }
 
     return builder.build();
   }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 61dd3f5660..a62898411d 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -726,6 +726,10 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
 
     keyArgs.setSortDatanodes(args.getSortDatanodes());
 
+    if (args.getRewriteGeneration() != null) {
+      keyArgs.setRewriteGeneration(args.getRewriteGeneration());
+    }
+
     req.setKeyArgs(keyArgs.build());
 
     OMRequest omRequest = createOMRequest(Type.CreateKey)
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
index 4aead0cd8b..852d49871c 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
@@ -67,6 +67,7 @@ public class TestOmKeyInfo {
     assertFalse(key.isHsync());
     key.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID, "clientid");
     assertTrue(key.isHsync());
+    assertEquals(5678L, key.getRewriteGeneration());
   }
 
   @Test
@@ -123,6 +124,7 @@ public class TestOmKeyInfo {
         .setReplicationConfig(replicationConfig)
         .addMetadata("key1", "value1")
         .addMetadata("key2", "value2")
+        .setRewriteGeneration(5678L)
         .build();
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 172867903c..e3b0c4a7a7 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -1097,6 +1097,56 @@ public abstract class TestOzoneRpcClientAbstract {
     }
   }
 
+  @Test
+  public void testRewriteKey() throws IOException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+
+    String value = "sample value";
+    String rewriteValue = "rewrite value";
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+
+    // TODO - only works on object store layout for now.
+    BucketArgs args = BucketArgs.newBuilder()
+        .setBucketLayout(BucketLayout.OBJECT_STORE)
+        .build();
+
+    volume.createBucket(bucketName, args);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String keyName = UUID.randomUUID().toString();
+    try (OzoneOutputStream out = bucket.createKey(keyName, 
value.getBytes(UTF_8).length,
+        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE), 
new HashMap<>())) {
+      out.write(value.getBytes(UTF_8));
+    }
+    OzoneKeyDetails keyDetails = bucket.getKey(keyName);
+
+    try (OzoneOutputStream out = bucket.rewriteKey(keyDetails.getName(), 
keyDetails.getDataSize(),
+        keyDetails.getGeneration(), 
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+        keyDetails.getMetadata())) {
+      out.write(rewriteValue.getBytes(UTF_8));
+    }
+
+    try (OzoneInputStream is = bucket.readKey(keyName)) {
+      byte[] fileContent = new byte[rewriteValue.getBytes(UTF_8).length];
+      is.read(fileContent);
+      assertEquals(rewriteValue, new String(fileContent, UTF_8));
+    }
+
+    // Delete the key
+    bucket.deleteKey(keyName);
+
+    // Now try the rewrite again, and it should fail as the originally read 
key is no longer there.
+    assertThrows(IOException.class, () -> {
+      try (OzoneOutputStream out = bucket.rewriteKey(keyDetails.getName(), 
keyDetails.getDataSize(),
+          keyDetails.getGeneration(), 
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+          keyDetails.getMetadata())) {
+        out.write(rewriteValue.getBytes(UTF_8));
+      }
+    });
+  }
+
   @Test
   public void testCheckUsedBytesQuota() throws IOException {
     String volumeName = UUID.randomUUID().toString();
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 4a18f308c9..75a2562ae7 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1026,6 +1026,13 @@ message KeyArgs {
     // Force OM to update container cache location from SCL
     optional bool forceUpdateContainerCacheFromSCM = 20;
     optional string ownerName = 21;
+    // rewriteGeneration, when used in key creation indicates that a
+    // key with the same keyName should exist with the given generation.
+    // For a key commit to succeed, the original key should still be present 
with the
+    // generation unchanged.
+    // This allows a key to be created an committed atomically if the original 
has not
+    // been modified.
+    optional uint64 rewriteGeneration = 22;
 }
 
 message KeyLocation {
@@ -1109,6 +1116,13 @@ message KeyInfo {
     optional FileChecksumProto fileChecksum = 18;
     optional bool isFile = 19;
     optional string ownerName = 20;
+  // rewriteGeneration, when used in key creation indicates that a
+  // key with the same keyName should exist with the given generation.
+  // For a key commit to succeed, the original key should still be present 
with the
+  // generation unchanged.
+  // This allows a key to be created an committed atomically if the original 
has not
+  // been modified.
+    optional uint64 rewriteGeneration = 21;
 }
 
 message BasicKeyInfo {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/RequestAuditor.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/RequestAuditor.java
index 78e67bb8ed..910bb2f431 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/RequestAuditor.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/RequestAuditor.java
@@ -82,6 +82,10 @@ public interface RequestAuditor {
         auditMap.put(OzoneConsts.REPLICATION_CONFIG,
             ECReplicationConfig.toString(keyArgs.getEcReplicationConfig()));
       }
+      if (keyArgs.hasRewriteGeneration()) {
+        auditMap.put(OzoneConsts.REWRITE_GENERATION,
+            String.valueOf(keyArgs.getRewriteGeneration()));
+      }
       for (HddsProtos.KeyValue item : keyArgs.getMetadataList()) {
         if (ETAG.equals(item.getKey())) {
           auditMap.put(ETAG, item.getValue());
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index 5c3593b078..48effbee4c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -240,6 +240,12 @@ public class OMKeyCommitRequest extends OMKeyRequest {
         throw new OMException("Failed to " + action + " key, as " + dbOpenKey +
             " entry is not found in the OpenKey table", KEY_NOT_FOUND);
       }
+
+      validateAtomicRewrite(keyToDelete, omKeyInfo, auditMap);
+      // Optimistic locking validation has passed. Now set the rewrite fields 
to null so they are
+      // not persisted in the key table.
+      omKeyInfo.setRewriteGeneration(null);
+
       omKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(
           commitKeyArgs.getMetadataList()));
       if (isHSync) {
@@ -497,4 +503,22 @@ public class OMKeyCommitRequest extends OMKeyRequest {
     }
     return req;
   }
+
+  private void validateAtomicRewrite(OmKeyInfo existing, OmKeyInfo toCommit, 
Map<String, String> auditMap)
+      throws OMException {
+    if (toCommit.getRewriteGeneration() != null) {
+      // These values are not passed in the request keyArgs, so add them into 
the auditMap if they are present
+      // in the open key entry.
+      auditMap.put(OzoneConsts.REWRITE_GENERATION, 
String.valueOf(toCommit.getRewriteGeneration()));
+      if (existing == null) {
+        throw new OMException("Atomic rewrite is not allowed for a new key", 
KEY_NOT_FOUND);
+      }
+      if (!toCommit.getRewriteGeneration().equals(existing.getUpdateID())) {
+        throw new OMException("Cannot commit as current generation (" + 
existing.getUpdateID() +
+            ") does not match with the rewrite generation (" + 
toCommit.getRewriteGeneration() + ")",
+            KEY_NOT_FOUND);
+      }
+    }
+  }
+
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
index e9a9f00719..4ede37e6b4 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
@@ -231,6 +231,7 @@ public class OMKeyCreateRequest extends OMKeyRequest {
           keyName);
       OmKeyInfo dbKeyInfo = omMetadataManager.getKeyTable(getBucketLayout())
           .getIfExist(dbKeyName);
+      validateAtomicRewrite(dbKeyInfo, keyArgs);
 
       OmBucketInfo bucketInfo =
           getBucketInfo(omMetadataManager, volumeName, bucketName);
@@ -440,4 +441,17 @@ public class OMKeyCreateRequest extends OMKeyRequest {
     }
     return req;
   }
+
+  private void validateAtomicRewrite(OmKeyInfo dbKeyInfo, KeyArgs keyArgs)
+      throws OMException {
+    if (keyArgs.hasRewriteGeneration()) {
+      // If a key does not exist, or if it exists but the updateID do not 
match, then fail this request.
+      if (dbKeyInfo == null) {
+        throw new OMException("Key not found during expected rewrite", 
OMException.ResultCodes.KEY_NOT_FOUND);
+      }
+      if (dbKeyInfo.getUpdateID() != keyArgs.getRewriteGeneration()) {
+        throw new OMException("Generation mismatch during expected rewrite", 
OMException.ResultCodes.KEY_NOT_FOUND);
+      }
+    }
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 1addd2431b..a29e63466e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -775,11 +775,13 @@ public abstract class OMKeyRequest extends 
OMClientRequest {
       dbKeyInfo.setReplicationConfig(replicationConfig);
 
       // Construct a new metadata map from KeyArgs.
-      // Clear the old one when the key is overwritten.
       dbKeyInfo.getMetadata().clear();
       dbKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(
           keyArgs.getMetadataList()));
 
+      if (keyArgs.hasRewriteGeneration()) {
+        dbKeyInfo.setRewriteGeneration(keyArgs.getRewriteGeneration());
+      }
       dbKeyInfo.setFileEncryptionInfo(encInfo);
       return dbKeyInfo;
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
index 9719865db1..2fc38cbd3a 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.om.request.key;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -31,11 +32,13 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
@@ -58,9 +61,12 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -116,7 +122,7 @@ public class TestOMKeyCommitRequest extends 
TestOMKeyRequest {
     OMClientResponse omClientResponse =
         omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
 
-    assertEquals(OzoneManagerProtocolProtos.Status.OK,
+    assertEquals(OK,
         omClientResponse.getOMResponse().getStatus());
 
     // Entry should be deleted from openKey Table.
@@ -183,7 +189,7 @@ public class TestOMKeyCommitRequest extends 
TestOMKeyRequest {
     OMClientResponse omClientResponse =
         omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
 
-    assertEquals(OzoneManagerProtocolProtos.Status.OK,
+    assertEquals(OK,
         omClientResponse.getOMResponse().getStatus());
 
     // Entry should be deleted from openKey Table.
@@ -218,6 +224,75 @@ public class TestOMKeyCommitRequest extends 
TestOMKeyRequest {
         omKeyInfo.getLatestVersionLocations().getLocationList());
   }
 
+  @Test
+  public void testAtomicRewrite() throws Exception {
+    if (getBucketLayout() == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+     // TODO - does not work with in FSO for now
+      return;
+    }
+
+    Table<String, OmKeyInfo> openKeyTable = 
omMetadataManager.getOpenKeyTable(getBucketLayout());
+    Table<String, OmKeyInfo> closedKeyTable = 
omMetadataManager.getKeyTable(getBucketLayout());
+
+    OMRequest modifiedOmRequest = doPreExecute(createCommitKeyRequest());
+    OMKeyCommitRequest omKeyCommitRequest = 
getOmKeyCommitRequest(modifiedOmRequest);
+    KeyArgs keyArgs = modifiedOmRequest.getCommitKeyRequest().getKeyArgs();
+
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager, omKeyCommitRequest.getBucketLayout());
+
+    // Append new blocks
+    List<OmKeyLocationInfo> allocatedLocationList =
+        keyArgs.getKeyLocationsList().stream()
+            .map(OmKeyLocationInfo::getFromProtobuf)
+            .collect(Collectors.toList());
+
+    OmKeyInfo.Builder omKeyInfoBuilder = OMRequestTestUtils.createOmKeyInfo(
+        volumeName, bucketName, keyName, replicationConfig, new 
OmKeyLocationInfoGroup(version, new ArrayList<>()));
+    omKeyInfoBuilder.setRewriteGeneration(1L);
+    OmKeyInfo omKeyInfo = omKeyInfoBuilder.build();
+    omKeyInfo.appendNewBlocks(allocatedLocationList, false);
+    List<OzoneAcl> acls = 
Collections.singletonList(OzoneAcl.parseAcl("user:foo:rw"));
+    omKeyInfo.addAcl(acls.get(0));
+
+    String openKey = getOzonePathKey() + "/" + 
modifiedOmRequest.getCommitKeyRequest().getClientID();
+
+    openKeyTable.put(openKey, omKeyInfo);
+    OmKeyInfo openKeyInfo = openKeyTable.get(openKey);
+    assertNotNull(openKeyInfo);
+    assertEquals(acls, openKeyInfo.getAcls());
+    // At this stage, we have an openKey, with rewrite generation of 1.
+    // However there is no closed key entry, so the commit should fail.
+    OMClientResponse omClientResponse =
+        omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
+    assertEquals(KEY_NOT_FOUND, omClientResponse.getOMResponse().getStatus());
+
+    // Now add the key to the key table, and try again, but with different 
generation
+    omKeyInfoBuilder.setRewriteGeneration(null);
+    omKeyInfoBuilder.setUpdateID(0L);
+    OmKeyInfo invalidKeyInfo = omKeyInfoBuilder.build();
+    closedKeyTable.put(getOzonePathKey(), invalidKeyInfo);
+    // This should fail as the updateID ia zero and the open key has rewrite 
generation of 1.
+    omClientResponse = omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 
100L);
+    assertEquals(KEY_NOT_FOUND, omClientResponse.getOMResponse().getStatus());
+
+    omKeyInfoBuilder.setUpdateID(1L);
+    OmKeyInfo closedKeyInfo = omKeyInfoBuilder.build();
+
+    closedKeyTable.delete(getOzonePathKey());
+    closedKeyTable.put(getOzonePathKey(), closedKeyInfo);
+
+    // Now the key should commit as the updateID and rewrite Generation match.
+    omClientResponse = omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 
100L);
+    assertEquals(OK, omClientResponse.getOMResponse().getStatus());
+
+    OmKeyInfo committedKey = closedKeyTable.get(getOzonePathKey());
+    assertNull(committedKey.getRewriteGeneration());
+    // Generation should be changed
+    assertNotEquals(closedKeyInfo.getGeneration(), 
committedKey.getGeneration());
+    assertEquals(acls, committedKey.getAcls());
+  }
+
   @Test
   public void testValidateAndUpdateCacheWithUncommittedBlocks()
       throws Exception {
@@ -260,7 +335,7 @@ public class TestOMKeyCommitRequest extends 
TestOMKeyRequest {
     OMClientResponse omClientResponse =
         omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
 
-    assertEquals(OzoneManagerProtocolProtos.Status.OK,
+    assertEquals(OK,
         omClientResponse.getOMResponse().getStatus());
 
     Map<String, RepeatedOmKeyInfo> toDeleteKeyList
@@ -385,7 +460,7 @@ public class TestOMKeyCommitRequest extends 
TestOMKeyRequest {
     
     OMClientResponse omClientResponse =
         omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
-    assertEquals(OzoneManagerProtocolProtos.Status.OK,
+    assertEquals(OK,
         omClientResponse.getOMResponse().getStatus());
 
     // Key should be present in both OpenKeyTable and KeyTable with HSync 
commit
@@ -550,7 +625,7 @@ public class TestOMKeyCommitRequest extends 
TestOMKeyRequest {
     OMClientResponse omClientResponse =
         omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
 
-    assertEquals(OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND,
+    assertEquals(KEY_NOT_FOUND,
         omClientResponse.getOMResponse().getStatus());
 
     omKeyInfo =
@@ -596,7 +671,7 @@ public class TestOMKeyCommitRequest extends 
TestOMKeyRequest {
     OMClientResponse omClientResponse =
         omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 102L);
 
-    assertEquals(OzoneManagerProtocolProtos.Status.OK, 
omClientResponse.getOMResponse().getStatus());
+    assertEquals(OK, omClientResponse.getOMResponse().getStatus());
 
     // New entry should be created in key Table.
     omKeyInfo = 
omMetadataManager.getKeyTable(omKeyCommitRequest.getBucketLayout()).get(ozoneKey);
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
index 0790e2af3b..4c7b2aa047 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
@@ -70,6 +70,7 @@ import static 
org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_INDICATOR;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
 import static 
org.apache.hadoop.ozone.om.request.OMRequestTestUtils.addVolumeAndBucketToDB;
 import static 
org.apache.hadoop.ozone.om.request.OMRequestTestUtils.createOmKeyInfo;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.NOT_A_FILE;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -120,9 +121,9 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
     long scmBlockSize = ozoneManager.getScmBlockSize();
     for (int i = 0; i <= repConfig.getRequiredNodes(); i++) {
       doPreExecute(createKeyRequest(isMultipartKey, partNumber,
-          scmBlockSize * i, repConfig));
+          scmBlockSize * i, repConfig, null));
       doPreExecute(createKeyRequest(isMultipartKey, partNumber,
-          scmBlockSize * i + 1, repConfig));
+          scmBlockSize * i + 1, repConfig, null));
     }
   }
 
@@ -482,7 +483,7 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
     Map<String, String> initialMetadata =
         Collections.singletonMap("initialKey", "initialValue");
     OMRequest initialRequest =
-        createKeyRequest(false, 0, keyName, initialMetadata);
+        createKeyRequest(false, 0, keyName, initialMetadata, 
Collections.emptyList());
     OMKeyCreateRequest initialOmKeyCreateRequest =
         new OMKeyCreateRequest(initialRequest, getBucketLayout());
     OMClientResponse initialResponse =
@@ -500,7 +501,7 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
     Map<String, String> updatedMetadata =
         Collections.singletonMap("initialKey", "updatedValue");
     OMRequest updatedRequest =
-        createKeyRequest(false, 0, keyName, updatedMetadata);
+        createKeyRequest(false, 0, keyName, updatedMetadata, 
Collections.emptyList());
     OMKeyCreateRequest updatedOmKeyCreateRequest =
         new OMKeyCreateRequest(updatedRequest, getBucketLayout());
 
@@ -520,7 +521,7 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
 
     // Create the key request without any initial metadata
     OMRequest createRequestWithoutMetadata = createKeyRequest(false, 0, 
keyName,
-        null); // Passing 'null' for metadata
+        null, Collections.emptyList()); // Passing 'null' for metadata
     OMKeyCreateRequest createOmKeyCreateRequest =
         new OMKeyCreateRequest(createRequestWithoutMetadata, 
getBucketLayout());
 
@@ -543,7 +544,7 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
 
     // Overwrite the previously created key with new metadata
     OMRequest overwriteRequestWithMetadata =
-        createKeyRequest(false, 0, keyName, overwriteMetadata);
+        createKeyRequest(false, 0, keyName, overwriteMetadata, 
Collections.emptyList());
     OMKeyCreateRequest overwriteOmKeyCreateRequest =
         new OMKeyCreateRequest(overwriteRequestWithMetadata, 
getBucketLayout());
 
@@ -648,7 +649,7 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
 
   private OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
                                      String keyName) {
-    return createKeyRequest(isMultipartKey, partNumber, keyName, null);
+    return createKeyRequest(isMultipartKey, partNumber, keyName, null, 
Collections.emptyList());
   }
 
   /**
@@ -665,7 +666,8 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
    */
   protected OMRequest createKeyRequest(boolean isMultipartKey, int partNumber,
                                        String keyName,
-                                       Map<String, String> metadata) {
+                                       Map<String, String> metadata,
+                                       List<OzoneAcl> acls) {
     KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
@@ -676,6 +678,9 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
         .setType(replicationConfig.getReplicationType())
         .setLatestVersionLocation(true);
 
+    for (OzoneAcl acl : acls) {
+      keyArgs.addAcls(OzoneAcl.toProtobuf(acl));
+    }
     // Configure for multipart upload, if applicable
     if (isMultipartKey) {
       keyArgs.setDataSize(dataSize).setMultipartNumber(partNumber);
@@ -701,7 +706,14 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
 
   private OMRequest createKeyRequest(
       boolean isMultipartKey, int partNumber, long keyLength,
-      ReplicationConfig repConfig) {
+      ReplicationConfig repConfig, Long rewriteGeneration) {
+    return createKeyRequest(isMultipartKey, partNumber, keyLength, repConfig,
+        rewriteGeneration, null);
+  }
+
+  private OMRequest createKeyRequest(
+      boolean isMultipartKey, int partNumber, long keyLength,
+      ReplicationConfig repConfig, Long rewriteGeneration, Map<String, String> 
metaData) {
 
     KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
         .setVolumeName(volumeName).setBucketName(bucketName)
@@ -720,8 +732,17 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
     if (isMultipartKey) {
       keyArgs.setMultipartNumber(partNumber);
     }
+    if (rewriteGeneration != null) {
+      keyArgs.setRewriteGeneration(rewriteGeneration);
+    }
+    if (metaData != null) {
+      metaData.forEach((key, value) -> 
keyArgs.addMetadata(KeyValue.newBuilder()
+          .setKey(key)
+          .setValue(value)
+          .build()));
+    }
 
-    OzoneManagerProtocolProtos.CreateKeyRequest createKeyRequest =
+    CreateKeyRequest createKeyRequest =
         CreateKeyRequest.newBuilder().setKeyArgs(keyArgs).build();
 
     return OMRequest.newBuilder()
@@ -903,6 +924,80 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
 
   }
 
+  @ParameterizedTest
+  @MethodSource("data")
+  public void testAtomicRewrite(
+      boolean setKeyPathLock, boolean setFileSystemPaths) throws Exception {
+    when(ozoneManager.getOzoneLockProvider()).thenReturn(
+        new OzoneLockProvider(setKeyPathLock, setFileSystemPaths));
+
+    if (getBucketLayout() == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+      // TODO: Test is not applicable for FSO layout.
+      return;
+    }
+
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, omMetadataManager,
+        OmBucketInfo.newBuilder().setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setBucketLayout(getBucketLayout()));
+
+    // First, create a key with the rewrite ID - this should fail as no key 
exists
+    OMRequest omRequest = createKeyRequest(false, 0, 100,
+        RatisReplicationConfig.getInstance(THREE), 1L);
+    omRequest = doPreExecute(omRequest);
+    OMKeyCreateRequest omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
+    OMClientResponse response = 
omKeyCreateRequest.validateAndUpdateCache(ozoneManager, 105L);
+    assertEquals(KEY_NOT_FOUND, response.getOMResponse().getStatus());
+
+    // Now pre-create the key in the system so we can rewrite it.
+    Map<String, String> metadata = Collections.singletonMap("metakey", 
"metavalue");
+    Map<String, String> reWriteMetadata = Collections.singletonMap("metakey", 
"rewriteMetavalue");
+
+    List<OzoneAcl> acls = 
Collections.singletonList(OzoneAcl.parseAcl("user:foo:rw"));
+    OmKeyInfo createdKeyInfo = createAndCheck(keyName, metadata, acls);
+    // Commit openKey entry.
+    OMRequestTestUtils.addKeyToTable(false, false,
+        createdKeyInfo, 0L, 0L, omMetadataManager);
+    // Retrieve the committed key info
+    String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName, 
keyName);
+    OmKeyInfo existingKeyInfo = 
omMetadataManager.getKeyTable(getBucketLayout()).get(ozoneKey);
+    List<OzoneAcl> existingAcls = existingKeyInfo.getAcls();
+    assertEquals(acls, existingAcls);
+
+    // Create a request with a generation which doesn't match the current key
+    omRequest = createKeyRequest(false, 0, 100,
+        RatisReplicationConfig.getInstance(THREE), 
existingKeyInfo.getGeneration() + 1, reWriteMetadata);
+    omRequest = doPreExecute(omRequest);
+    omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
+    response = omKeyCreateRequest.validateAndUpdateCache(ozoneManager, 105L);
+    // Still fails, as the matching key is not present.
+    assertEquals(KEY_NOT_FOUND, response.getOMResponse().getStatus());
+
+    // Now create the key with the correct rewrite generation
+    omRequest = createKeyRequest(false, 0, 100,
+        RatisReplicationConfig.getInstance(THREE), 
existingKeyInfo.getGeneration(), reWriteMetadata);
+    omRequest = doPreExecute(omRequest);
+    omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
+    response = omKeyCreateRequest.validateAndUpdateCache(ozoneManager, 105L);
+    assertEquals(OK, response.getOMResponse().getStatus());
+
+    // Ensure the rewriteGeneration is persisted in the open key table
+    String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
+        keyName, omRequest.getCreateKeyRequest().getClientID());
+    OmKeyInfo openKeyInfo = 
omMetadataManager.getOpenKeyTable(omKeyCreateRequest.getBucketLayout()).get(openKey);
+
+    assertEquals(existingKeyInfo.getGeneration(), 
openKeyInfo.getRewriteGeneration());
+    // Creation time should remain the same on rewrite.
+    assertEquals(existingKeyInfo.getCreationTime(), 
openKeyInfo.getCreationTime());
+    // Update ID should change
+    assertNotEquals(existingKeyInfo.getGeneration(), 
openKeyInfo.getGeneration());
+    assertEquals(metadata, existingKeyInfo.getMetadata());
+    // The metadata should not be copied from the existing key. It should be 
passed in the request.
+    assertEquals(reWriteMetadata, openKeyInfo.getMetadata());
+    // Ensure the ACLS are copied over from the existing key.
+    assertEquals(existingAcls, openKeyInfo.getAcls());
+  }
+
   /**
    * Leaf file has ACCESS scope acls which inherited
    * from parent DEFAULT acls.
@@ -959,9 +1054,13 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
     assertEquals(NOT_A_FILE, omClientResponse.getOMResponse().getStatus());
   }
 
-
   private void createAndCheck(String keyName) throws Exception {
-    OMRequest omRequest = createKeyRequest(false, 0, keyName);
+    createAndCheck(keyName, Collections.emptyMap(), Collections.emptyList());
+  }
+
+  private OmKeyInfo createAndCheck(String keyName, Map<String, String> 
metadata, List<OzoneAcl> acls)
+      throws Exception {
+    OMRequest omRequest = createKeyRequest(false, 0, keyName, metadata, acls);
 
     OMKeyCreateRequest omKeyCreateRequest = getOMKeyCreateRequest(omRequest);
 
@@ -974,10 +1073,10 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
 
     assertEquals(OK, omClientResponse.getOMResponse().getStatus());
 
-    checkCreatedPaths(omKeyCreateRequest, omRequest, keyName);
+    return checkCreatedPaths(omKeyCreateRequest, omRequest, keyName);
   }
 
-  protected void checkCreatedPaths(
+  protected OmKeyInfo checkCreatedPaths(
       OMKeyCreateRequest omKeyCreateRequest, OMRequest omRequest,
       String keyName) throws Exception {
     keyName = omKeyCreateRequest.validateAndNormalizeKey(true, keyName);
@@ -992,6 +1091,7 @@ public class TestOMKeyCreateRequest extends 
TestOMKeyRequest {
         omMetadataManager.getOpenKeyTable(omKeyCreateRequest.getBucketLayout())
             .get(openKey);
     assertNotNull(omKeyInfo);
+    return omKeyInfo;
   }
 
   protected long checkIntermediatePaths(Path keyPath) throws Exception {
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
index 2a25a9b096..87badb2812 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
@@ -117,7 +117,7 @@ public class TestOMKeyCreateRequestWithFSO extends 
TestOMKeyCreateRequest {
   }
 
   @Override
-  protected void checkCreatedPaths(OMKeyCreateRequest omKeyCreateRequest,
+  protected OmKeyInfo checkCreatedPaths(OMKeyCreateRequest omKeyCreateRequest,
       OMRequest omRequest, String keyName) throws Exception {
     keyName = omKeyCreateRequest.validateAndNormalizeKey(true, keyName,
         BucketLayout.FILE_SYSTEM_OPTIMIZED);
@@ -139,6 +139,7 @@ public class TestOMKeyCreateRequestWithFSO extends 
TestOMKeyCreateRequest {
         omMetadataManager.getOpenKeyTable(omKeyCreateRequest.getBucketLayout())
             .get(openKey);
     assertNotNull(omKeyInfo);
+    return omKeyInfo;
   }
 
   @Override
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
index bc562d5d93..d7d092c0e5 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
@@ -227,6 +227,14 @@ public class ClientProtocolStub implements ClientProtocol {
         .createKey(keyName, size, replicationConfig, metadata);
   }
 
+  @Override
+  public OzoneOutputStream rewriteKey(String volumeName, String bucketName, 
String keyName,
+      long size, long existingKeyGeneration, ReplicationConfig 
replicationConfig,
+      Map<String, String> metadata) throws IOException {
+    return getBucket(volumeName, bucketName)
+        .rewriteKey(keyName, size, existingKeyGeneration, replicationConfig, 
metadata);
+  }
+
   @Override
   public OzoneInputStream getKey(String volumeName, String bucketName,
                                  String keyName) throws IOException {
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index db74bd562c..0dc5488676 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -157,6 +157,38 @@ public final class OzoneBucketStub extends OzoneBucket {
     return new OzoneOutputStream(byteArrayOutputStream, null);
   }
 
+  @Override
+  public OzoneOutputStream rewriteKey(String keyName, long size, long 
existingKeyGeneration,
+      ReplicationConfig rConfig, Map<String, String> metadata) throws 
IOException {
+    final ReplicationConfig repConfig;
+    if (rConfig == null) {
+      repConfig = getReplicationConfig();
+    } else {
+      repConfig = rConfig;
+    }
+    ReplicationConfig finalReplicationCon = repConfig;
+    ByteArrayOutputStream byteArrayOutputStream =
+        new KeyMetadataAwareOutputStream(metadata) {
+          @Override
+          public void close() throws IOException {
+            keyContents.put(keyName, toByteArray());
+            keyDetails.put(keyName, new OzoneKeyDetails(
+                getVolumeName(),
+                getName(),
+                keyName,
+                size,
+                System.currentTimeMillis(),
+                System.currentTimeMillis(),
+                new ArrayList<>(), finalReplicationCon, metadata, null,
+                () -> readKey(keyName), true, null, null
+            ));
+            super.close();
+          }
+        };
+
+    return new OzoneOutputStream(byteArrayOutputStream, null);
+  }
+
   @Override
   public OzoneDataStreamOutput createStreamKey(String key, long size,
                                                ReplicationConfig rConfig,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to