This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new ed6f046c18 HDDS-9146. Potential data loss with HSync due to
deletedTable entry having the same block as keyTable entry's (#5167)
ed6f046c18 is described below
commit ed6f046c18f5f1a542aacfb7effe3174eba3b3af
Author: Siyao Meng <[email protected]>
AuthorDate: Fri Aug 11 00:49:10 2023 -0700
HDDS-9146. Potential data loss with HSync due to deletedTable entry having
the same block as keyTable entry's (#5167)
---
.../hadoop/hdds/scm/storage/BlockLocationInfo.java | 5 +-
.../apache/hadoop/ozone/om/helpers/OmKeyInfo.java | 19 ++++
.../ozone/om/helpers/OmKeyLocationInfoGroup.java | 14 ++-
.../hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java | 7 ++
.../java/org/apache/hadoop/fs/ozone/TestHSync.java | 62 +++++++++++
.../org/apache/hadoop/ozone/om/OzoneManager.java | 13 +++
.../ozone/om/ratis/OzoneManagerDoubleBuffer.java | 2 +-
.../ozone/om/request/key/OMKeyCommitRequest.java | 33 +++++-
.../om/request/key/OMKeyCommitRequestWithFSO.java | 27 ++++-
.../hadoop/ozone/om/request/key/OMKeyRequest.java | 113 ++++++++++++++++++++-
...OzoneManagerProtocolServerSideTranslatorPB.java | 9 ++
.../om/request/key/TestOMKeyCommitRequest.java | 59 +++++++----
.../ozone/om/request/key/TestOMKeyRequest.java | 7 ++
13 files changed, 341 insertions(+), 29 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
index cf368b0680..019e16c2f1 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
@@ -164,9 +164,8 @@ public class BlockLocationInfo {
}
@Override
- public String toString() {
- return "{blockID={containerID=" + blockID.getContainerID() +
- ", localID=" + blockID.getLocalID() + "}" +
+ public String toString() {
+ return "{blockID={" + blockID + "}" +
", length=" + length +
", offset=" + offset +
", token=" + token +
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index e48cf98e90..57bcd39f6f 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -425,6 +425,25 @@ public final class OmKeyInfo extends WithParentObjectId
return fileChecksum;
}
+ @Override
+ public String toString() {
+ return "OmKeyInfo{" +
+ "volumeName='" + volumeName + '\'' +
+ ", bucketName='" + bucketName + '\'' +
+ ", keyName='" + keyName + '\'' +
+ ", dataSize=" + dataSize +
+ ", keyLocationVersions=" + keyLocationVersions +
+ ", creationTime=" + creationTime +
+ ", modificationTime=" + modificationTime +
+ ", replicationConfig=" + replicationConfig +
+ ", encInfo=" + (encInfo == null ? "null" : "<REDACTED>") +
+ ", fileChecksum=" + fileChecksum +
+ ", isFile=" + isFile +
+ ", fileName='" + fileName + '\'' +
+ ", acls=" + acls +
+ '}';
+ }
+
/**
* Builder of OmKeyInfo.
*/
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfoGroup.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfoGroup.java
index 931657e8e7..e934ef1b22 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfoGroup.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfoGroup.java
@@ -80,6 +80,13 @@ public class OmKeyLocationInfoGroup {
return isMultipartKey;
}
+ /**
+ * @return Raw internal locationVersionMap.
+ */
+ public Map<Long, List<OmKeyLocationInfo>> getLocationVersionMap() {
+ return locationVersionMap;
+ }
+
/**
* Return only the blocks that are created in the most recent version.
*
@@ -182,7 +189,12 @@ public class OmKeyLocationInfoGroup {
sb.append("isMultipartKey:").append(isMultipartKey).append(" ");
for (List<OmKeyLocationInfo> kliList : locationVersionMap.values()) {
for (OmKeyLocationInfo kli: kliList) {
- sb.append(kli.getLocalID()).append(" || ");
+ sb.append("conID ").append(kli.getContainerID());
+ sb.append(" ");
+ sb.append("locID ").append(kli.getLocalID());
+ sb.append(" ");
+ sb.append("bcsID ").append(kli.getBlockCommitSequenceId());
+ sb.append(" || ");
}
}
return sb.toString();
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java
index 48c00cef35..2ee5420a4c 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java
@@ -125,6 +125,13 @@ public class RepeatedOmKeyInfo implements
CopyObject<RepeatedOmKeyInfo> {
return builder.build();
}
+ @Override
+ public String toString() {
+ return "RepeatedOmKeyInfo{" +
+ "omKeyInfoList=" + omKeyInfoList +
+ '}';
+ }
+
/**
* Builder of RepeatedOmKeyInfo.
*/
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 07a5cc3a48..b313aa80fb 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -44,6 +44,8 @@ import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
@@ -54,10 +56,17 @@ import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest;
+import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequestWithFSO;
+import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
+import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -65,7 +74,9 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
@@ -122,6 +133,11 @@ public class TestHSync {
// create a volume and a bucket to be used by OzoneFileSystem
bucket = TestDataUtil.createVolumeAndBucket(client, layout);
+
+ // Enable DEBUG level logging for relevant classes
+ GenericTestUtils.setLogLevel(OMKeyRequest.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(OMKeyCommitRequest.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(OMKeyCommitRequestWithFSO.LOG, Level.DEBUG);
}
@AfterAll
@@ -132,6 +148,52 @@ public class TestHSync {
}
}
+ @Test
+ public void testKeyHSyncThenClose() throws Exception {
+ // Check that deletedTable should not have keys with the same block as in
+ // keyTable's when a key is hsync()'ed then close()'d.
+
+ // Set the fs.defaultFS
+ final String rootPath = String.format("%s://%s/",
+ OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
+ CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+ final String dir = OZONE_ROOT + bucket.getVolumeName()
+ + OZONE_URI_DELIMITER + bucket.getName();
+
+ String data = "random data";
+ final Path file = new Path(dir, "file-hsync-then-close");
+ try (FileSystem fs = FileSystem.get(CONF)) {
+ try (FSDataOutputStream outputStream = fs.create(file, true)) {
+ outputStream.write(data.getBytes(UTF_8), 0, data.length());
+ outputStream.hsync();
+ }
+ }
+
+ OzoneManager ozoneManager = cluster.getOzoneManager();
+ // Wait for double buffer to trigger all pending addToDBBatch(),
+ // including OMKeyCommitResponse(WithFSO)'s that writes to deletedTable.
+ ozoneManager.awaitDoubleBufferFlush();
+
+ OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
+ // deletedTable should not have an entry for file at all in this case
+ try (TableIterator<String,
+ ? extends Table.KeyValue<String, RepeatedOmKeyInfo>>
+ tableIter = metadataManager.getDeletedTable().iterator()) {
+ while (tableIter.hasNext()) {
+ Table.KeyValue<String, RepeatedOmKeyInfo> kv = tableIter.next();
+ String key = kv.getKey();
+ if (key.startsWith(file.toString())) {
+ RepeatedOmKeyInfo val = kv.getValue();
+ LOG.error("Unexpected deletedTable entry: key = {}, val = {}",
+ key, val);
+ Assertions.fail("deletedTable should not have such entry. key = " +
+ key);
+ }
+ }
+ }
+ }
+
@Test
public void testO3fsHSync() throws Exception {
// Set the fs.defaultFS
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index f5bd2ad602..8c0950d6df 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -4709,4 +4709,17 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
public ReconfigurationHandler getReconfigurationHandler() {
return reconfigurationHandler;
}
+
+ /**
+ * Wait until both buffers are flushed. This is used in cases like
+ * "follower bootstrap tarball creation" where the rocksDb for the active
+ * fs needs to synchronized with the rocksdb's for the snapshots.
+ */
+ public void awaitDoubleBufferFlush() throws InterruptedException {
+ if (isRatisEnabled()) {
+ getOmRatisServer().getOmStateMachine().awaitDoubleBufferFlush();
+ } else {
+ getOmServerProtocol().awaitDoubleBufferFlush();
+ }
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index ab4e99470e..2a1cca4e1d 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -678,7 +678,7 @@ public final class OzoneManagerDoubleBuffer {
isRunning.set(true);
}
- void awaitFlush() throws InterruptedException {
+ public void awaitFlush() throws InterruptedException {
flushNotifier.await();
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index 3053655904..33bf839167 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.ozone.OmUtils;
@@ -74,7 +75,8 @@ import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_L
*/
public class OMKeyCommitRequest extends OMKeyRequest {
- private static final Logger LOG =
+ @VisibleForTesting
+ public static final Logger LOG =
LoggerFactory.getLogger(OMKeyCommitRequest.class);
public OMKeyCommitRequest(OMRequest omRequest, BucketLayout bucketLayout) {
@@ -134,7 +136,6 @@ public class OMKeyCommitRequest extends OMKeyRequest {
String keyName = commitKeyArgs.getKeyName();
OMMetrics omMetrics = ozoneManager.getMetrics();
- omMetrics.incNumKeyCommits();
AuditLogger auditLogger = ozoneManager.getAuditLogger();
@@ -154,6 +155,16 @@ public class OMKeyCommitRequest extends OMKeyRequest {
boolean isHSync = commitKeyRequest.hasHsync() &&
commitKeyRequest.getHsync();
+
+ if (isHSync) {
+ omMetrics.incNumKeyHSyncs();
+ } else {
+ omMetrics.incNumKeyCommits();
+ }
+
+ LOG.debug("isHSync = {}, volumeName = {}, bucketName = {}, keyName = {}",
+ isHSync, volumeName, bucketName, keyName);
+
try {
commitKeyArgs = resolveBucketLink(ozoneManager, commitKeyArgs, auditMap);
volumeName = commitKeyArgs.getVolumeName();
@@ -253,7 +264,16 @@ public class OMKeyCommitRequest extends OMKeyRequest {
if (null == oldKeyVersionsToDeleteMap) {
oldKeyVersionsToDeleteMap = new HashMap<>();
}
- oldKeyVersionsToDeleteMap.put(delKeyName, oldVerKeyInfo);
+
+ // Remove any block from oldVerKeyInfo that share the same container ID
+ // and local ID with omKeyInfo blocks'.
+ // Otherwise, it causes data loss once those shared blocks are added
+ // to deletedTable and processed by KeyDeletingService for deletion.
+ filterOutBlocksStillInUse(omKeyInfo, oldVerKeyInfo);
+
+ if (!oldVerKeyInfo.getOmKeyInfoList().isEmpty()) {
+ oldKeyVersionsToDeleteMap.put(delKeyName, oldVerKeyInfo);
+ }
} else {
checkBucketQuotaInNamespace(omBucketInfo, 1L);
checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
@@ -278,6 +298,9 @@ public class OMKeyCommitRequest extends OMKeyRequest {
// Add to cache of open key table and key table.
if (!isHSync) {
+ // If isHSync = false, put a tombstone in OpenKeyTable cache,
+ // indicating the key is removed from OpenKeyTable.
+ // So that this key can't be committed again.
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
dbOpenKey, trxnLogIndex);
}
@@ -307,6 +330,10 @@ public class OMKeyCommitRequest extends OMKeyRequest {
}
}
+ // Debug logging for any key commit operation, successful or not
+ LOG.debug("Key commit {} with isHSync = {}, omKeyInfo = {}",
+ result == Result.SUCCESS ? "succeeded" : "failed", isHSync, omKeyInfo);
+
if (!isHSync) {
auditLog(auditLogger, buildAuditMessage(OMAction.COMMIT_KEY, auditMap,
exception, getOmRequest().getUserInfo()));
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index fa7f92e9a8..9b4094a381 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.ozone.om.request.key;
import java.util.HashMap;
+
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.AuditLogger;
import org.apache.hadoop.ozone.audit.OMAction;
@@ -60,7 +62,8 @@ import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_L
*/
public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest {
- private static final Logger LOG =
+ @VisibleForTesting
+ public static final Logger LOG =
LoggerFactory.getLogger(OMKeyCommitRequestWithFSO.class);
public OMKeyCommitRequestWithFSO(OMRequest omRequest,
@@ -98,12 +101,16 @@ public class OMKeyCommitRequestWithFSO extends
OMKeyCommitRequest {
Result result;
boolean isHSync = commitKeyRequest.hasHsync() &&
commitKeyRequest.getHsync();
+
if (isHSync) {
omMetrics.incNumKeyHSyncs();
} else {
omMetrics.incNumKeyCommits();
}
+ LOG.debug("isHSync = {}, volumeName = {}, bucketName = {}, keyName = {}",
+ isHSync, volumeName, bucketName, keyName);
+
OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
try {
@@ -199,7 +206,16 @@ public class OMKeyCommitRequestWithFSO extends
OMKeyCommitRequest {
if (null == oldKeyVersionsToDeleteMap) {
oldKeyVersionsToDeleteMap = new HashMap<>();
}
- oldKeyVersionsToDeleteMap.put(delKeyName, oldVerKeyInfo);
+
+ // Remove any block from oldVerKeyInfo that share the same container ID
+ // and local ID with omKeyInfo blocks'.
+ // Otherwise, it causes data loss once those shared blocks are added
+ // to deletedTable and processed by KeyDeletingService for deletion.
+ filterOutBlocksStillInUse(omKeyInfo, oldVerKeyInfo);
+
+ if (!oldVerKeyInfo.getOmKeyInfoList().isEmpty()) {
+ oldKeyVersionsToDeleteMap.put(delKeyName, oldVerKeyInfo);
+ }
} else {
checkBucketQuotaInNamespace(omBucketInfo, 1L);
checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
@@ -226,6 +242,9 @@ public class OMKeyCommitRequestWithFSO extends
OMKeyCommitRequest {
// Add to cache of open key table and key table.
if (!isHSync) {
+ // If isHSync = false, put a tombstone in OpenKeyTable cache,
+ // indicating the key is removed from OpenKeyTable.
+ // So that this key can't be committed again.
OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager,
dbOpenFileKey, null, fileName, trxnLogIndex);
}
@@ -255,6 +274,10 @@ public class OMKeyCommitRequestWithFSO extends
OMKeyCommitRequest {
}
}
+ // Debug logging for any key commit operation, successful or not
+ LOG.debug("Key commit {} with isHSync = {}, omKeyInfo = {}",
+ result == Result.SUCCESS ? "succeeded" : "failed", isHSync, omKeyInfo);
+
if (!isHSync) {
auditLog(auditLogger, buildAuditMessage(OMAction.COMMIT_KEY, auditMap,
exception, getOmRequest().getUserInfo()));
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 6d079359ea..13bdbbca3a 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -26,12 +26,17 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
@@ -100,7 +105,8 @@ import static org.apache.hadoop.util.Time.monotonicNow;
*/
public abstract class OMKeyRequest extends OMClientRequest {
- private static final Logger LOG =
LoggerFactory.getLogger(OMKeyRequest.class);
+ @VisibleForTesting
+ public static final Logger LOG = LoggerFactory.getLogger(OMKeyRequest.class);
private BucketLayout bucketLayout = BucketLayout.DEFAULT;
@@ -844,4 +850,109 @@ public abstract class OMKeyRequest extends
OMClientRequest {
pseudoKeyInfo.setKeyLocationVersions(uncommittedGroups);
return pseudoKeyInfo;
}
+
+ /**
+ * Remove blocks in-place from keysToBeFiltered that exist in referenceKey.
+ * <p>
+ * keysToBeFiltered.getOmKeyInfoList() becomes an empty list when all blocks
+ * are filtered out.
+ *
+ * @param referenceKey OmKeyInfo
+ * @param keysToBeFiltered RepeatedOmKeyInfo
+ */
+ protected void filterOutBlocksStillInUse(OmKeyInfo referenceKey,
+ RepeatedOmKeyInfo keysToBeFiltered)
{
+
+ LOG.debug("Before block filtering, keysToBeFiltered = {}",
+ keysToBeFiltered);
+
+ // A HashSet for fast lookup. Gathers all ContainerBlockID entries inside
+ // the referenceKey.
+ HashSet<ContainerBlockID> cbIdSet = referenceKey.getKeyLocationVersions()
+ .stream()
+ .flatMap(e -> e.getLocationList().stream())
+ .map(omKeyLocationInfo ->
+ omKeyLocationInfo.getBlockID().getContainerBlockID())
+ .collect(Collectors.toCollection(HashSet::new));
+
+ // Pardon the nested loops. ContainerBlockID is 9-layer deep from:
+ // keysToBeFiltered // Layer 0. RepeatedOmKeyInfo
+ // .getOmKeyInfoList() // 1. List<OmKeyInfo>
+ // .get(0) // 2. OmKeyInfo
+ // .getKeyLocationVersions() // 3. List<OmKeyLocationInfoGroup>
+ // .get(0) // 4. OmKeyLocationInfoGroup
+ // .getLocationVersionMap() // 5. Map<Long, List<OmKeyLocationInfo>>
+ // .get(version) // 6. List<OmKeyLocationInfo>
+ // .get(0) // 7. OmKeyLocationInfo
+ // .getBlockID() // 8. BlockID
+ // .getContainerBlockID(); // 9. ContainerBlockID
+
+ // Using iterator instead of `for` or `forEach` for in-place entry removal
+
+ // Layer 1: List<OmKeyInfo>
+ Iterator<OmKeyInfo> iterOmKeyInfo = keysToBeFiltered
+ .getOmKeyInfoList().iterator();
+
+ while (iterOmKeyInfo.hasNext()) {
+ // Note with HDDS-8462, each RepeatedOmKeyInfo should have only one
entry,
+ // so this outer most loop should never be entered twice in each call.
+
+ // But for completeness sake I shall put it here.
+ // Remove only when RepeatedOmKeyInfo is no longer used.
+
+ // Layer 2: OmKeyInfo
+ OmKeyInfo oldOmKeyInfo = iterOmKeyInfo.next();
+ // Layer 3: List<OmKeyLocationInfoGroup>
+ Iterator<OmKeyLocationInfoGroup> iterKeyLocInfoGroup = oldOmKeyInfo
+ .getKeyLocationVersions().iterator();
+ while (iterKeyLocInfoGroup.hasNext()) {
+ // Layer 4: OmKeyLocationInfoGroup
+ OmKeyLocationInfoGroup keyLocInfoGroup = iterKeyLocInfoGroup.next();
+ // Layer 5: Map<Long, List<OmKeyLocationInfo>>
+ Iterator<Map.Entry<Long, List<OmKeyLocationInfo>>> iterVerMap =
+ keyLocInfoGroup.getLocationVersionMap().entrySet().iterator();
+
+ while (iterVerMap.hasNext()) {
+ Map.Entry<Long, List<OmKeyLocationInfo>> mapEntry =
iterVerMap.next();
+ // Layer 6: List<OmKeyLocationInfo>
+ List<OmKeyLocationInfo> omKeyLocationInfoList = mapEntry.getValue();
+
+ Iterator<OmKeyLocationInfo> iterKeyLocInfo =
+ omKeyLocationInfoList.iterator();
+ while (iterKeyLocInfo.hasNext()) {
+ // Layer 7: OmKeyLocationInfo
+ OmKeyLocationInfo keyLocationInfo = iterKeyLocInfo.next();
+ // Layer 8: BlockID. Then Layer 9: ContainerBlockID
+ ContainerBlockID cbId = keyLocationInfo
+ .getBlockID().getContainerBlockID();
+
+ if (cbIdSet.contains(cbId)) {
+ // Remove this block from oldVerKeyInfo because it is referenced.
+ iterKeyLocInfo.remove();
+ LOG.debug("Filtered out block: {}", cbId);
+ }
+ }
+
+ // Cleanup when Layer 6 is an empty list
+ if (omKeyLocationInfoList.isEmpty()) {
+ iterVerMap.remove();
+ }
+ }
+
+ // Cleanup when Layer 5 is an empty map
+ if (keyLocInfoGroup.getLocationVersionMap().isEmpty()) {
+ iterKeyLocInfoGroup.remove();
+ }
+ }
+
+ // Cleanup when Layer 3 is an empty list
+ if (oldOmKeyInfo.getKeyLocationVersions().isEmpty()) {
+ iterOmKeyInfo.remove();
+ }
+ }
+
+ // Intentional extra space for alignment
+ LOG.debug("After block filtering, keysToBeFiltered = {}",
+ keysToBeFiltered);
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 17adbd13d0..c8e9b679cf 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -331,4 +331,13 @@ public class OzoneManagerProtocolServerSideTranslatorPB
implements
public static Logger getLog() {
return LOG;
}
+
+ /**
+ * Wait until both buffers are flushed. This is used in cases like
+ * "follower bootstrap tarball creation" where the rocksDb for the active
+ * fs needs to synchronized with the rocksdb's for the snapshots.
+ */
+ public void awaitDoubleBufferFlush() throws InterruptedException {
+ ozoneManagerDoubleBuffer.awaitFlush();
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
index 92a484c919..4c610141b6 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java
@@ -313,8 +313,13 @@ public class TestOMKeyCommitRequest extends
TestOMKeyRequest {
omMetadataManager, bucketLayout);
List<KeyLocation> allocatedKeyLocationList = getKeyLocation(10);
+ // hsync should throw OMException
assertThrows(OMException.class, () ->
- performHsyncCommit(allocatedKeyLocationList.subList(0, 5)));
+ doKeyCommit(true, allocatedKeyLocationList.subList(0, 5)));
+
+ // Regular key commit should still work
+ doKeyCommit(false, allocatedKeyLocationList.subList(0, 5));
+
conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
}
@@ -330,26 +335,38 @@ public class TestOMKeyCommitRequest extends
TestOMKeyRequest {
.get(bucketKey);
long usedBytes = bucketInfo.getUsedBytes();
- performHsyncCommit(allocatedKeyLocationList.subList(0, 5));
- bucketInfo = omMetadataManager.getBucketTable()
- .get(bucketKey);
+ // 1st commit of 3 blocks, HSync = true
+ Map<String, RepeatedOmKeyInfo> keyToDeleteMap =
+ doKeyCommit(true, allocatedKeyLocationList.subList(0, 3));
+ Assert.assertNull(keyToDeleteMap);
+ bucketInfo = omMetadataManager.getBucketTable().get(bucketKey);
long firstCommitUsedBytes = bucketInfo.getUsedBytes();
- Assert.assertEquals(500, firstCommitUsedBytes - usedBytes);
-
- performHsyncCommit(allocatedKeyLocationList);
- bucketInfo = omMetadataManager.getBucketTable()
- .get(bucketKey);
- long nextCommitUsedBytes = bucketInfo.getUsedBytes();
-
- Assert.assertEquals(1000, nextCommitUsedBytes - usedBytes);
+ Assert.assertEquals(300, firstCommitUsedBytes - usedBytes);
+
+ // 2nd commit of 6 blocks, HSync = true
+ keyToDeleteMap = doKeyCommit(true, allocatedKeyLocationList.subList(0, 6));
+ Assert.assertNull(keyToDeleteMap);
+ bucketInfo = omMetadataManager.getBucketTable().get(bucketKey);
+ long secondCommitUsedBytes = bucketInfo.getUsedBytes();
+ Assert.assertEquals(600, secondCommitUsedBytes - usedBytes);
+
+ // 3rd and final commit of all 10 blocks, HSync = false
+ keyToDeleteMap = doKeyCommit(false, allocatedKeyLocationList);
+ // keyToDeleteMap should be empty because none of the previous blocks
+ // should be deleted.
+ Assert.assertNotNull(keyToDeleteMap);
+ Assert.assertTrue(keyToDeleteMap.isEmpty());
+ bucketInfo = omMetadataManager.getBucketTable().get(bucketKey);
+ long thirdCommitUsedBytes = bucketInfo.getUsedBytes();
+ Assert.assertEquals(1000, thirdCommitUsedBytes - usedBytes);
}
-
- private List<KeyLocation> performHsyncCommit(
+
+ private Map<String, RepeatedOmKeyInfo> doKeyCommit(boolean isHSync,
List<KeyLocation> keyLocations) throws Exception {
// allocated block list
dataSize = keyLocations.size() * 100;
OMRequest modifiedOmRequest = doPreExecute(createCommitKeyRequest(
- keyLocations, true));
+ keyLocations, isHSync));
OMKeyCommitRequest omKeyCommitRequest =
getOmKeyCommitRequest(modifiedOmRequest);
@@ -365,16 +382,22 @@ public class TestOMKeyCommitRequest extends
TestOMKeyRequest {
Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
omClientResponse.getOMResponse().getStatus());
- // key must be prsent in both open key table and key table for hsync
+ // Key should be present in both OpenKeyTable and KeyTable with HSync
commit
OmKeyInfo omKeyInfo =
omMetadataManager.getOpenKeyTable(
omKeyCommitRequest.getBucketLayout()).get(openKey);
- Assert.assertNotNull(omKeyInfo);
+ if (isHSync) {
+ Assert.assertNotNull(omKeyInfo);
+ } else {
+ // Key should not exist in OpenKeyTable anymore with non-HSync commit
+ Assert.assertNull(omKeyInfo);
+ }
omKeyInfo =
omMetadataManager.getKeyTable(omKeyCommitRequest.getBucketLayout())
.get(ozoneKey);
Assert.assertNotNull(omKeyInfo);
- return keyLocations;
+
+ return ((OMKeyCommitResponse) omClientResponse).getKeysToDelete();
}
@Test
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
index 37ad06f5d7..0954382b78 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
@@ -46,6 +46,7 @@ import
org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
import org.apache.hadoop.ozone.security.acl.OzoneNativeAuthorizer;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ozone.test.GenericTestUtils;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Assert;
@@ -73,6 +74,7 @@ import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.ScmClient;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
import org.apache.hadoop.util.Time;
+import org.slf4j.event.Level;
import static
org.apache.hadoop.ozone.om.request.OMRequestTestUtils.setupReplicationConfigValidation;
import static org.mockito.ArgumentMatchers.any;
@@ -231,6 +233,11 @@ public class TestOMKeyRequest {
OmSnapshotManager omSnapshotManager = new OmSnapshotManager(ozoneManager);
when(ozoneManager.getOmSnapshotManager())
.thenReturn(omSnapshotManager);
+
+ // Enable DEBUG level logging for relevant classes
+ GenericTestUtils.setLogLevel(OMKeyRequest.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(OMKeyCommitRequest.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(OMKeyCommitRequestWithFSO.LOG, Level.DEBUG);
}
@NotNull
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]