This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new ed6f046c18 HDDS-9146. Potential data loss with HSync due to 
deletedTable entry having the same block as keyTable entry's (#5167)
ed6f046c18 is described below

commit ed6f046c18f5f1a542aacfb7effe3174eba3b3af
Author: Siyao Meng <[email protected]>
AuthorDate: Fri Aug 11 00:49:10 2023 -0700

    HDDS-9146. Potential data loss with HSync due to deletedTable entry having 
the same block as keyTable entry's (#5167)
---
 .../hadoop/hdds/scm/storage/BlockLocationInfo.java |   5 +-
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  |  19 ++++
 .../ozone/om/helpers/OmKeyLocationInfoGroup.java   |  14 ++-
 .../hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java |   7 ++
 .../java/org/apache/hadoop/fs/ozone/TestHSync.java |  62 +++++++++++
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  13 +++
 .../ozone/om/ratis/OzoneManagerDoubleBuffer.java   |   2 +-
 .../ozone/om/request/key/OMKeyCommitRequest.java   |  33 +++++-
 .../om/request/key/OMKeyCommitRequestWithFSO.java  |  27 ++++-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  | 113 ++++++++++++++++++++-
 ...OzoneManagerProtocolServerSideTranslatorPB.java |   9 ++
 .../om/request/key/TestOMKeyCommitRequest.java     |  59 +++++++----
 .../ozone/om/request/key/TestOMKeyRequest.java     |   7 ++
 13 files changed, 341 insertions(+), 29 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
index cf368b0680..019e16c2f1 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
@@ -164,9 +164,8 @@ public class BlockLocationInfo {
   }
 
   @Override
-  public String  toString() {
-    return "{blockID={containerID=" + blockID.getContainerID() +
-        ", localID=" + blockID.getLocalID() + "}" +
+  public String toString() {
+    return "{blockID={" + blockID + "}" +
         ", length=" + length +
         ", offset=" + offset +
         ", token=" + token +
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index e48cf98e90..57bcd39f6f 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -425,6 +425,25 @@ public final class OmKeyInfo extends WithParentObjectId
     return fileChecksum;
   }
 
+  @Override
+  public String toString() {
+    return "OmKeyInfo{" +
+        "volumeName='" + volumeName + '\'' +
+        ", bucketName='" + bucketName + '\'' +
+        ", keyName='" + keyName + '\'' +
+        ", dataSize=" + dataSize +
+        ", keyLocationVersions=" + keyLocationVersions +
+        ", creationTime=" + creationTime +
+        ", modificationTime=" + modificationTime +
+        ", replicationConfig=" + replicationConfig +
+        ", encInfo=" + (encInfo == null ? "null" : "<REDACTED>") +
+        ", fileChecksum=" + fileChecksum +
+        ", isFile=" + isFile +
+        ", fileName='" + fileName + '\'' +
+        ", acls=" + acls +
+        '}';
+  }
+
   /**
    * Builder of OmKeyInfo.
    */
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfoGroup.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfoGroup.java
index 931657e8e7..e934ef1b22 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfoGroup.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfoGroup.java
@@ -80,6 +80,13 @@ public class OmKeyLocationInfoGroup {
     return isMultipartKey;
   }
 
+  /**
+   * @return Raw internal locationVersionMap.
+   */
+  public Map<Long, List<OmKeyLocationInfo>> getLocationVersionMap() {
+    return locationVersionMap;
+  }
+
   /**
    * Return only the blocks that are created in the most recent version.
    *
@@ -182,7 +189,12 @@ public class OmKeyLocationInfoGroup {
     sb.append("isMultipartKey:").append(isMultipartKey).append(" ");
     for (List<OmKeyLocationInfo> kliList : locationVersionMap.values()) {
       for (OmKeyLocationInfo kli: kliList) {
-        sb.append(kli.getLocalID()).append(" || ");
+        sb.append("conID ").append(kli.getContainerID());
+        sb.append(" ");
+        sb.append("locID ").append(kli.getLocalID());
+        sb.append(" ");
+        sb.append("bcsID ").append(kli.getBlockCommitSequenceId());
+        sb.append(" || ");
       }
     }
     return sb.toString();
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java
index 48c00cef35..2ee5420a4c 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java
@@ -125,6 +125,13 @@ public class RepeatedOmKeyInfo implements 
CopyObject<RepeatedOmKeyInfo> {
     return builder.build();
   }
 
+  @Override
+  public String toString() {
+    return "RepeatedOmKeyInfo{" +
+        "omKeyInfoList=" + omKeyInfoList +
+        '}';
+  }
+
   /**
    * Builder of RepeatedOmKeyInfo.
    */
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 07a5cc3a48..b313aa80fb 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -44,6 +44,8 @@ import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -54,10 +56,17 @@ import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest;
+import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequestWithFSO;
+import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
+import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -65,7 +74,9 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
@@ -122,6 +133,11 @@ public class TestHSync {
 
     // create a volume and a bucket to be used by OzoneFileSystem
     bucket = TestDataUtil.createVolumeAndBucket(client, layout);
+
+    // Enable DEBUG level logging for relevant classes
+    GenericTestUtils.setLogLevel(OMKeyRequest.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(OMKeyCommitRequest.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(OMKeyCommitRequestWithFSO.LOG, Level.DEBUG);
   }
 
   @AfterAll
@@ -132,6 +148,52 @@ public class TestHSync {
     }
   }
 
+  @Test
+  public void testKeyHSyncThenClose() throws Exception {
+    // Check that deletedTable should not have keys with the same block as in
+    // keyTable's when a key is hsync()'ed then close()'d.
+
+    // Set the fs.defaultFS
+    final String rootPath = String.format("%s://%s/",
+        OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
+    CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+    final String dir = OZONE_ROOT + bucket.getVolumeName()
+        + OZONE_URI_DELIMITER + bucket.getName();
+
+    String data = "random data";
+    final Path file = new Path(dir, "file-hsync-then-close");
+    try (FileSystem fs = FileSystem.get(CONF)) {
+      try (FSDataOutputStream outputStream = fs.create(file, true)) {
+        outputStream.write(data.getBytes(UTF_8), 0, data.length());
+        outputStream.hsync();
+      }
+    }
+
+    OzoneManager ozoneManager = cluster.getOzoneManager();
+    // Wait for double buffer to trigger all pending addToDBBatch(),
+    // including OMKeyCommitResponse(WithFSO)'s that writes to deletedTable.
+    ozoneManager.awaitDoubleBufferFlush();
+
+    OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
+    // deletedTable should not have an entry for file at all in this case
+    try (TableIterator<String,
+        ? extends Table.KeyValue<String, RepeatedOmKeyInfo>>
+        tableIter = metadataManager.getDeletedTable().iterator()) {
+      while (tableIter.hasNext()) {
+        Table.KeyValue<String, RepeatedOmKeyInfo> kv = tableIter.next();
+        String key = kv.getKey();
+        if (key.startsWith(file.toString())) {
+          RepeatedOmKeyInfo val = kv.getValue();
+          LOG.error("Unexpected deletedTable entry: key = {}, val = {}",
+              key, val);
+          Assertions.fail("deletedTable should not have such entry. key = " +
+              key);
+        }
+      }
+    }
+  }
+
   @Test
   public void testO3fsHSync() throws Exception {
     // Set the fs.defaultFS
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index f5bd2ad602..8c0950d6df 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -4709,4 +4709,17 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   public ReconfigurationHandler getReconfigurationHandler() {
     return reconfigurationHandler;
   }
+
+  /**
+   * Wait until both buffers are flushed.  This is used in cases like
+   * "follower bootstrap tarball creation" where the rocksDb for the active
+   * fs needs to synchronized with the rocksdb's for the snapshots.
+   */
+  public void awaitDoubleBufferFlush() throws InterruptedException {
+    if (isRatisEnabled()) {
+      getOmRatisServer().getOmStateMachine().awaitDoubleBufferFlush();
+    } else {
+      getOmServerProtocol().awaitDoubleBufferFlush();
+    }
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index ab4e99470e..2a1cca4e1d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -678,7 +678,7 @@ public final class OzoneManagerDoubleBuffer {
     isRunning.set(true);
   }
 
-  void awaitFlush() throws InterruptedException {
+  public void awaitFlush() throws InterruptedException {
     flushNotifier.await();
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index 3053655904..33bf839167 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.ozone.OmUtils;
@@ -74,7 +75,8 @@ import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_L
  */
 public class OMKeyCommitRequest extends OMKeyRequest {
 
-  private static final Logger LOG =
+  @VisibleForTesting
+  public static final Logger LOG =
       LoggerFactory.getLogger(OMKeyCommitRequest.class);
 
   public OMKeyCommitRequest(OMRequest omRequest, BucketLayout bucketLayout) {
@@ -134,7 +136,6 @@ public class OMKeyCommitRequest extends OMKeyRequest {
     String keyName = commitKeyArgs.getKeyName();
 
     OMMetrics omMetrics = ozoneManager.getMetrics();
-    omMetrics.incNumKeyCommits();
 
     AuditLogger auditLogger = ozoneManager.getAuditLogger();
 
@@ -154,6 +155,16 @@ public class OMKeyCommitRequest extends OMKeyRequest {
 
     boolean isHSync = commitKeyRequest.hasHsync() &&
             commitKeyRequest.getHsync();
+
+    if (isHSync) {
+      omMetrics.incNumKeyHSyncs();
+    } else {
+      omMetrics.incNumKeyCommits();
+    }
+
+    LOG.debug("isHSync = {}, volumeName = {}, bucketName = {}, keyName = {}",
+        isHSync, volumeName, bucketName, keyName);
+
     try {
       commitKeyArgs = resolveBucketLink(ozoneManager, commitKeyArgs, auditMap);
       volumeName = commitKeyArgs.getVolumeName();
@@ -253,7 +264,16 @@ public class OMKeyCommitRequest extends OMKeyRequest {
         if (null == oldKeyVersionsToDeleteMap) {
           oldKeyVersionsToDeleteMap = new HashMap<>();
         }
-        oldKeyVersionsToDeleteMap.put(delKeyName, oldVerKeyInfo);
+
+        // Remove any block from oldVerKeyInfo that share the same container ID
+        // and local ID with omKeyInfo blocks'.
+        // Otherwise, it causes data loss once those shared blocks are added
+        // to deletedTable and processed by KeyDeletingService for deletion.
+        filterOutBlocksStillInUse(omKeyInfo, oldVerKeyInfo);
+
+        if (!oldVerKeyInfo.getOmKeyInfoList().isEmpty()) {
+          oldKeyVersionsToDeleteMap.put(delKeyName, oldVerKeyInfo);
+        }
       } else {
         checkBucketQuotaInNamespace(omBucketInfo, 1L);
         checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
@@ -278,6 +298,9 @@ public class OMKeyCommitRequest extends OMKeyRequest {
 
       // Add to cache of open key table and key table.
       if (!isHSync) {
+        // If isHSync = false, put a tombstone in OpenKeyTable cache,
+        // indicating the key is removed from OpenKeyTable.
+        // So that this key can't be committed again.
         omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
             dbOpenKey, trxnLogIndex);
       }
@@ -307,6 +330,10 @@ public class OMKeyCommitRequest extends OMKeyRequest {
       }
     }
 
+    // Debug logging for any key commit operation, successful or not
+    LOG.debug("Key commit {} with isHSync = {}, omKeyInfo = {}",
+        result == Result.SUCCESS ? "succeeded" : "failed", isHSync, omKeyInfo);
+
     if (!isHSync) {
       auditLog(auditLogger, buildAuditMessage(OMAction.COMMIT_KEY, auditMap,
               exception, getOmRequest().getUserInfo()));
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index fa7f92e9a8..9b4094a381 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.ozone.om.request.key;
 
 import java.util.HashMap;
+
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.OMAction;
@@ -60,7 +62,8 @@ import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_L
  */
 public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest {
 
-  private static final Logger LOG =
+  @VisibleForTesting
+  public static final Logger LOG =
       LoggerFactory.getLogger(OMKeyCommitRequestWithFSO.class);
 
   public OMKeyCommitRequestWithFSO(OMRequest omRequest,
@@ -98,12 +101,16 @@ public class OMKeyCommitRequestWithFSO extends 
OMKeyCommitRequest {
     Result result;
     boolean isHSync = commitKeyRequest.hasHsync() &&
         commitKeyRequest.getHsync();
+
     if (isHSync) {
       omMetrics.incNumKeyHSyncs();
     } else {
       omMetrics.incNumKeyCommits();
     }
 
+    LOG.debug("isHSync = {}, volumeName = {}, bucketName = {}, keyName = {}",
+        isHSync, volumeName, bucketName, keyName);
+
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
 
     try {
@@ -199,7 +206,16 @@ public class OMKeyCommitRequestWithFSO extends 
OMKeyCommitRequest {
         if (null == oldKeyVersionsToDeleteMap) {
           oldKeyVersionsToDeleteMap = new HashMap<>();
         }
-        oldKeyVersionsToDeleteMap.put(delKeyName, oldVerKeyInfo);
+
+        // Remove any block from oldVerKeyInfo that share the same container ID
+        // and local ID with omKeyInfo blocks'.
+        // Otherwise, it causes data loss once those shared blocks are added
+        // to deletedTable and processed by KeyDeletingService for deletion.
+        filterOutBlocksStillInUse(omKeyInfo, oldVerKeyInfo);
+
+        if (!oldVerKeyInfo.getOmKeyInfoList().isEmpty()) {
+          oldKeyVersionsToDeleteMap.put(delKeyName, oldVerKeyInfo);
+        }
       } else {
         checkBucketQuotaInNamespace(omBucketInfo, 1L);
         checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
@@ -226,6 +242,9 @@ public class OMKeyCommitRequestWithFSO extends 
OMKeyCommitRequest {
 
       // Add to cache of open key table and key table.
       if (!isHSync) {
+        // If isHSync = false, put a tombstone in OpenKeyTable cache,
+        // indicating the key is removed from OpenKeyTable.
+        // So that this key can't be committed again.
         OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager,
             dbOpenFileKey, null, fileName, trxnLogIndex);
       }
@@ -255,6 +274,10 @@ public class OMKeyCommitRequestWithFSO extends 
OMKeyCommitRequest {
       }
     }
 
+    // Debug logging for any key commit operation, successful or not
+    LOG.debug("Key commit {} with isHSync = {}, omKeyInfo = {}",
+        result == Result.SUCCESS ? "succeeded" : "failed", isHSync, omKeyInfo);
+
     if (!isHSync) {
       auditLog(auditLogger, buildAuditMessage(OMAction.COMMIT_KEY, auditMap,
               exception, getOmRequest().getUserInfo()));
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 6d079359ea..13bdbbca3a 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -26,12 +26,17 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.client.ContainerBlockID;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
@@ -100,7 +105,8 @@ import static org.apache.hadoop.util.Time.monotonicNow;
  */
 public abstract class OMKeyRequest extends OMClientRequest {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(OMKeyRequest.class);
+  @VisibleForTesting
+  public static final Logger LOG = LoggerFactory.getLogger(OMKeyRequest.class);
 
   private BucketLayout bucketLayout = BucketLayout.DEFAULT;
 
@@ -844,4 +850,109 @@ public abstract class OMKeyRequest extends 
OMClientRequest {
     pseudoKeyInfo.setKeyLocationVersions(uncommittedGroups);
     return pseudoKeyInfo;
   }
+
+  /**
+   * Remove blocks in-place from keysToBeFiltered that exist in referenceKey.
+   * <p>
+   * keysToBeFiltered.getOmKeyInfoList() becomes an empty list when all blocks
+   * are filtered out.
+   *
+   * @param referenceKey OmKeyInfo
+   * @param keysToBeFiltered RepeatedOmKeyInfo
+   */
+  protected void filterOutBlocksStillInUse(OmKeyInfo referenceKey,
+                                           RepeatedOmKeyInfo keysToBeFiltered) 
{
+
+    LOG.debug("Before block filtering, keysToBeFiltered = {}",
+        keysToBeFiltered);
+
+    // A HashSet for fast lookup. Gathers all ContainerBlockID entries inside
+    // the referenceKey.
+    HashSet<ContainerBlockID> cbIdSet = referenceKey.getKeyLocationVersions()
+        .stream()
+        .flatMap(e -> e.getLocationList().stream())
+        .map(omKeyLocationInfo ->
+            omKeyLocationInfo.getBlockID().getContainerBlockID())
+        .collect(Collectors.toCollection(HashSet::new));
+
+    // Pardon the nested loops. ContainerBlockID is 9-layer deep from:
+    // keysToBeFiltered               // Layer 0. RepeatedOmKeyInfo
+    //     .getOmKeyInfoList()        // 1. List<OmKeyInfo>
+    //     .get(0)                    // 2. OmKeyInfo
+    //     .getKeyLocationVersions()  // 3. List<OmKeyLocationInfoGroup>
+    //     .get(0)                    // 4. OmKeyLocationInfoGroup
+    //     .getLocationVersionMap()   // 5. Map<Long, List<OmKeyLocationInfo>>
+    //     .get(version)              // 6. List<OmKeyLocationInfo>
+    //     .get(0)                    // 7. OmKeyLocationInfo
+    //     .getBlockID()              // 8. BlockID
+    //     .getContainerBlockID();    // 9. ContainerBlockID
+
+    // Using iterator instead of `for` or `forEach` for in-place entry removal
+
+    // Layer 1: List<OmKeyInfo>
+    Iterator<OmKeyInfo> iterOmKeyInfo = keysToBeFiltered
+        .getOmKeyInfoList().iterator();
+
+    while (iterOmKeyInfo.hasNext()) {
+      // Note with HDDS-8462, each RepeatedOmKeyInfo should have only one 
entry,
+      // so this outer most loop should never be entered twice in each call.
+
+      // But for completeness sake I shall put it here.
+      // Remove only when RepeatedOmKeyInfo is no longer used.
+
+      // Layer 2: OmKeyInfo
+      OmKeyInfo oldOmKeyInfo = iterOmKeyInfo.next();
+      // Layer 3: List<OmKeyLocationInfoGroup>
+      Iterator<OmKeyLocationInfoGroup> iterKeyLocInfoGroup = oldOmKeyInfo
+          .getKeyLocationVersions().iterator();
+      while (iterKeyLocInfoGroup.hasNext()) {
+        // Layer 4: OmKeyLocationInfoGroup
+        OmKeyLocationInfoGroup keyLocInfoGroup = iterKeyLocInfoGroup.next();
+        // Layer 5: Map<Long, List<OmKeyLocationInfo>>
+        Iterator<Map.Entry<Long, List<OmKeyLocationInfo>>> iterVerMap =
+            keyLocInfoGroup.getLocationVersionMap().entrySet().iterator();
+
+        while (iterVerMap.hasNext()) {
+          Map.Entry<Long, List<OmKeyLocationInfo>> mapEntry = 
iterVerMap.next();
+          // Layer 6: List<OmKeyLocationInfo>
+          List<OmKeyLocationInfo> omKeyLocationInfoList = mapEntry.getValue();
+
+          Iterator<OmKeyLocationInfo> iterKeyLocInfo =
+              omKeyLocationInfoList.iterator();
+          while (iterKeyLocInfo.hasNext()) {
+            // Layer 7: OmKeyLocationInfo
+            OmKeyLocationInfo keyLocationInfo = iterKeyLocInfo.next();
+            // Layer 8: BlockID. Then Layer 9: ContainerBlockID
+            ContainerBlockID cbId = keyLocationInfo
+                .getBlockID().getContainerBlockID();
+
+            if (cbIdSet.contains(cbId)) {
+              // Remove this block from oldVerKeyInfo because it is referenced.
+              iterKeyLocInfo.remove();
+              LOG.debug("Filtered out block: {}", cbId);
+            }
+          }
+
+          // Cleanup when Layer 6 is an empty list
+          if (omKeyLocationInfoList.isEmpty()) {
+            iterVerMap.remove();
+          }
+        }
+
+        // Cleanup when Layer 5 is an empty map
+        if (keyLocInfoGroup.getLocationVersionMap().isEmpty()) {
+          iterKeyLocInfoGroup.remove();
+        }
+      }
+
+      // Cleanup when Layer 3 is an empty list
+      if (oldOmKeyInfo.getKeyLocationVersions().isEmpty()) {
+        iterOmKeyInfo.remove();
+      }
+    }
+
+    // Intentional extra space for alignment
+    LOG.debug("After block filtering,  keysToBeFiltered = {}",
+        keysToBeFiltered);
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 17adbd13d0..c8e9b679cf 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -331,4 +331,13 @@ public class OzoneManagerProtocolServerSideTranslatorPB 
implements
   public static Logger getLog() {
     return LOG;
   }
+
+  /**
+   * Wait until both buffers are flushed.  This is used in cases like
+   * "follower bootstrap tarball creation" where the rocksDb for the active
+   * fs needs to synchronized with the rocksdb's for the snapshots.
+   */
+  public void awaitDoubleBufferFlush() throws InterruptedException {
+    ozoneManagerDoubleBuffer.awaitFlush();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
index 92a484c919..4c610141b6 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
@@ -313,8 +313,13 @@ public class TestOMKeyCommitRequest extends 
TestOMKeyRequest {
         omMetadataManager, bucketLayout);
     List<KeyLocation> allocatedKeyLocationList = getKeyLocation(10);
 
+    // hsync should throw OMException
     assertThrows(OMException.class, () ->
-        performHsyncCommit(allocatedKeyLocationList.subList(0, 5)));
+        doKeyCommit(true, allocatedKeyLocationList.subList(0, 5)));
+
+    // Regular key commit should still work
+    doKeyCommit(false, allocatedKeyLocationList.subList(0, 5));
+
     conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
   }
 
@@ -330,26 +335,38 @@ public class TestOMKeyCommitRequest extends 
TestOMKeyRequest {
         .get(bucketKey);
     long usedBytes = bucketInfo.getUsedBytes();
 
-    performHsyncCommit(allocatedKeyLocationList.subList(0, 5));
-    bucketInfo = omMetadataManager.getBucketTable()
-        .get(bucketKey);
+    // 1st commit of 3 blocks, HSync = true
+    Map<String, RepeatedOmKeyInfo> keyToDeleteMap =
+        doKeyCommit(true, allocatedKeyLocationList.subList(0, 3));
+    Assert.assertNull(keyToDeleteMap);
+    bucketInfo = omMetadataManager.getBucketTable().get(bucketKey);
     long firstCommitUsedBytes = bucketInfo.getUsedBytes();
-    Assert.assertEquals(500, firstCommitUsedBytes - usedBytes);
-
-    performHsyncCommit(allocatedKeyLocationList);
-    bucketInfo = omMetadataManager.getBucketTable()
-        .get(bucketKey);
-    long nextCommitUsedBytes = bucketInfo.getUsedBytes();
-
-    Assert.assertEquals(1000, nextCommitUsedBytes - usedBytes);
+    Assert.assertEquals(300, firstCommitUsedBytes - usedBytes);
+
+    // 2nd commit of 6 blocks, HSync = true
+    keyToDeleteMap = doKeyCommit(true, allocatedKeyLocationList.subList(0, 6));
+    Assert.assertNull(keyToDeleteMap);
+    bucketInfo = omMetadataManager.getBucketTable().get(bucketKey);
+    long secondCommitUsedBytes = bucketInfo.getUsedBytes();
+    Assert.assertEquals(600, secondCommitUsedBytes - usedBytes);
+
+    // 3rd and final commit of all 10 blocks, HSync = false
+    keyToDeleteMap = doKeyCommit(false, allocatedKeyLocationList);
+    // keyToDeleteMap should be empty because none of the previous blocks
+    // should be deleted.
+    Assert.assertNotNull(keyToDeleteMap);
+    Assert.assertTrue(keyToDeleteMap.isEmpty());
+    bucketInfo = omMetadataManager.getBucketTable().get(bucketKey);
+    long thirdCommitUsedBytes = bucketInfo.getUsedBytes();
+    Assert.assertEquals(1000, thirdCommitUsedBytes - usedBytes);
   }
-  
-  private List<KeyLocation> performHsyncCommit(
+
+  private Map<String, RepeatedOmKeyInfo> doKeyCommit(boolean isHSync,
       List<KeyLocation> keyLocations) throws Exception {
     // allocated block list
     dataSize = keyLocations.size() * 100;
     OMRequest modifiedOmRequest = doPreExecute(createCommitKeyRequest(
-        keyLocations, true));
+        keyLocations, isHSync));
     OMKeyCommitRequest omKeyCommitRequest =
         getOmKeyCommitRequest(modifiedOmRequest);
 
@@ -365,16 +382,22 @@ public class TestOMKeyCommitRequest extends 
TestOMKeyRequest {
     Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
         omClientResponse.getOMResponse().getStatus());
 
-    // key must be prsent in both open key table and key table for hsync
+    // Key should be present in both OpenKeyTable and KeyTable with HSync 
commit
     OmKeyInfo omKeyInfo =
         omMetadataManager.getOpenKeyTable(
             omKeyCommitRequest.getBucketLayout()).get(openKey);
-    Assert.assertNotNull(omKeyInfo);
+    if (isHSync) {
+      Assert.assertNotNull(omKeyInfo);
+    } else {
+      // Key should not exist in OpenKeyTable anymore with non-HSync commit
+      Assert.assertNull(omKeyInfo);
+    }
     omKeyInfo =
         omMetadataManager.getKeyTable(omKeyCommitRequest.getBucketLayout())
             .get(ozoneKey);
     Assert.assertNotNull(omKeyInfo);
-    return keyLocations;
+
+    return ((OMKeyCommitResponse) omClientResponse).getKeysToDelete();
   }
 
   @Test
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
index 37ad06f5d7..0954382b78 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
@@ -46,6 +46,7 @@ import 
org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import org.apache.hadoop.ozone.security.acl.OzoneNativeAuthorizer;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ozone.test.GenericTestUtils;
 import org.jetbrains.annotations.NotNull;
 import org.junit.After;
 import org.junit.Assert;
@@ -73,6 +74,7 @@ import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.ScmClient;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.util.Time;
+import org.slf4j.event.Level;
 
 import static 
org.apache.hadoop.ozone.om.request.OMRequestTestUtils.setupReplicationConfigValidation;
 import static org.mockito.ArgumentMatchers.any;
@@ -231,6 +233,11 @@ public class TestOMKeyRequest {
     OmSnapshotManager omSnapshotManager = new OmSnapshotManager(ozoneManager);
     when(ozoneManager.getOmSnapshotManager())
         .thenReturn(omSnapshotManager);
+
+    // Enable DEBUG level logging for relevant classes
+    GenericTestUtils.setLogLevel(OMKeyRequest.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(OMKeyCommitRequest.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(OMKeyCommitRequestWithFSO.LOG, Level.DEBUG);
   }
 
   @NotNull


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to