This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new dce2e5e8c0d HDDS-13187. Extend Recon events handling to
MultipartInfoTable (#9250)
dce2e5e8c0d is described below
commit dce2e5e8c0d9af02f37663cf667f2a3d97d0fb46
Author: Priyesh Karatha <[email protected]>
AuthorDate: Fri Nov 7 07:24:20 2025 +0530
HDDS-13187. Extend Recon events handling to MultipartInfoTable (#9250)
Co-authored-by: tanvipenumudy
<[email protected]>
---
.../ozone/recon/api/OMDBInsightEndpoint.java | 55 +++++++
.../recon/tasks/MultipartInfoInsightHandler.java | 173 +++++++++++++++++++++
.../ozone/recon/tasks/OmTableInsightTask.java | 2 +
.../ozone/recon/tasks/TestOmTableInsightTask.java | 159 ++++++++++++++++++-
4 files changed, 384 insertions(+), 5 deletions(-)
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
index ae8e9bdb0b2..57cac7ec23c 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
@@ -21,6 +21,7 @@
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static
org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_DIR_TABLE;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_TABLE;
+import static
org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_INFO_TABLE;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_FILE_TABLE;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_KEY_TABLE;
import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_FETCH_COUNT;
@@ -377,6 +378,60 @@ private Long getValueFromId(GlobalStatsValue record) {
return record != null ? record.getValue() : 0L;
}
+ /**
+ * Retrieves the summary of open MPU keys.
+ *
+ * @return The HTTP response body includes a map with the following entries:
+ * - "totalOpenMPUKeys": the total number of open MPU keys
+ * - "totalReplicatedDataSize": the total replicated size for open MPU keys
+ * - "totalUnreplicatedDataSize": the total unreplicated size for open MPU
keys
+ *
+ * Example response:
+ * {
+ * "totalOpenMPUKeys": 2,
+ * "totalReplicatedDataSize": 90000,
+ * "totalDataSize": 30000
+ * }
+ */
+ @GET
+ @Path("/open/mpu/summary")
+ public Response getOpenMPUKeySummary() {
+ // Create a HashMap for the keysSummary
+ Map<String, Long> keysSummary = new HashMap<>();
+ // Create a keys summary for open MPU keys
+ createKeysSummaryForOpenMPUKey(keysSummary);
+ return Response.ok(keysSummary).build();
+ }
+
+ /**
+ * Creates a keys summary for open MPU keys and updates the provided
keysSummary map.
+ * Calculates the total number of open keys, replicated data size, and
unreplicated data size.
+ *
+ * @param keysSummary A map to store the keys summary information.
+ */
+ private void createKeysSummaryForOpenMPUKey(Map<String, Long> keysSummary) {
+ try {
+ Long replicatedSizeOpenMPUKey =
getValueFromId(reconGlobalStatsManager.getGlobalStatsValue(
+
OmTableInsightTask.getReplicatedSizeKeyFromTable(MULTIPART_INFO_TABLE)));
+ Long unreplicatedSizeOpenMPUKey =
getValueFromId(reconGlobalStatsManager.getGlobalStatsValue(
+
OmTableInsightTask.getUnReplicatedSizeKeyFromTable(MULTIPART_INFO_TABLE)));
+ Long openMPUKeyCount =
getValueFromId(reconGlobalStatsManager.getGlobalStatsValue(
+ OmTableInsightTask.getTableCountKeyFromTable(MULTIPART_INFO_TABLE)));
+ // Calculate the total number of open MPU keys
+ keysSummary.put("totalOpenMPUKeys", openMPUKeyCount);
+ // Calculate the total replicated and unreplicated sizes of open MPU keys
+ keysSummary.put("totalReplicatedDataSize", replicatedSizeOpenMPUKey);
+ keysSummary.put("totalDataSize", unreplicatedSizeOpenMPUKey);
+ } catch (IOException ex) {
+ LOG.error("Error retrieving open mpu key summary from RocksDB", ex);
+ // Return zeros in case of error
+ keysSummary.put("totalOpenMPUKeys", 0L);
+ // Calculate the total replicated and unreplicated sizes of open MPU keys
+ keysSummary.put("totalReplicatedDataSize", 0L);
+ keysSummary.put("totalDataSize", 0L);
+ }
+ }
+
/** Retrieves the summary of deleted keys.
*
* This method calculates and returns a summary of deleted keys.
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/MultipartInfoInsightHandler.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/MultipartInfoInsightHandler.java
new file mode 100644
index 00000000000..2f501a33ad9
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/MultipartInfoInsightHandler.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages records in the MultipartInfo Table, updating counts and sizes of
+ * multipart upload keys in the backend.
+ */
+public class MultipartInfoInsightHandler implements OmTableHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MultipartInfoInsightHandler.class);
+
+ /**
+ * Invoked by the process method to add information on those keys that have
+ * been initiated for multipart upload in the backend.
+ */
+ @Override
+ public void handlePutEvent(OMDBUpdateEvent<String, Object> event, String
tableName, Map<String, Long> objectCountMap,
+ Map<String, Long> unReplicatedSizeMap, Map<String, Long>
replicatedSizeMap) {
+
+ if (event.getValue() != null) {
+ OmMultipartKeyInfo multipartKeyInfo = (OmMultipartKeyInfo)
event.getValue();
+ objectCountMap.computeIfPresent(getTableCountKeyFromTable(tableName),
+ (k, count) -> count + 1L);
+
+ for (PartKeyInfo partKeyInfo : multipartKeyInfo.getPartKeyInfoMap()) {
+ OmKeyInfo omKeyInfo =
OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+
unReplicatedSizeMap.computeIfPresent(getUnReplicatedSizeKeyFromTable(tableName),
+ (k, size) -> size + omKeyInfo.getDataSize());
+
replicatedSizeMap.computeIfPresent(getReplicatedSizeKeyFromTable(tableName),
+ (k, size) -> size + omKeyInfo.getReplicatedSize());
+ }
+ } else {
+ LOG.warn("Put event does not have the Multipart Key Info for {}.",
event.getKey());
+ }
+ }
+
+ /**
+ * Invoked by the process method to delete information on those multipart
uploads that
+ * have been completed or aborted in the backend.
+ */
+ @Override
+ public void handleDeleteEvent(OMDBUpdateEvent<String, Object> event, String
tableName,
+ Map<String, Long> objectCountMap, Map<String, Long> unReplicatedSizeMap,
Map<String, Long> replicatedSizeMap) {
+
+ if (event.getValue() != null) {
+ OmMultipartKeyInfo multipartKeyInfo = (OmMultipartKeyInfo)
event.getValue();
+ objectCountMap.computeIfPresent(getTableCountKeyFromTable(tableName),
+ (k, count) -> count > 0 ? count - 1L : 0L);
+
+ for (PartKeyInfo partKeyInfo : multipartKeyInfo.getPartKeyInfoMap()) {
+ OmKeyInfo omKeyInfo =
OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+
unReplicatedSizeMap.computeIfPresent(getUnReplicatedSizeKeyFromTable(tableName),
+ (k, size) -> {
+ long newSize = size > omKeyInfo.getDataSize() ? size -
omKeyInfo.getDataSize() : 0L;
+ if (newSize < 0) {
+ LOG.warn("Negative unreplicated size for key: {}. Original:
{}, Part: {}",
+ k, size, omKeyInfo.getDataSize());
+ }
+ return newSize;
+ });
+
replicatedSizeMap.computeIfPresent(getReplicatedSizeKeyFromTable(tableName),
+ (k, size) -> {
+ long newSize = size > omKeyInfo.getReplicatedSize() ? size -
omKeyInfo.getReplicatedSize() : 0L;
+ if (newSize < 0) {
+ LOG.warn("Negative replicated size for key: {}. Original: {},
Part: {}",
+ k, size, omKeyInfo.getReplicatedSize());
+ }
+ return newSize;
+ });
+ }
+ } else {
+ LOG.warn("Delete event does not have the Multipart Key Info for {}.",
event.getKey());
+ }
+ }
+
+ /**
+ * Invoked by the process method to update information on those multipart
uploads that
+ * have been updated in the backend.
+ */
+ @Override
+ public void handleUpdateEvent(OMDBUpdateEvent<String, Object> event, String
tableName,
+ Map<String, Long> objectCountMap, Map<String, Long> unReplicatedSizeMap,
Map<String, Long> replicatedSizeMap) {
+
+ if (event.getValue() != null) {
+ if (event.getOldValue() == null) {
+ LOG.warn("Update event does not have the old Multipart Key Info for
{}.", event.getKey());
+ return;
+ }
+
+ // In Update event the count for the multipart info table will not
change. So we
+ // don't need to update the count.
+ OmMultipartKeyInfo oldMultipartKeyInfo = (OmMultipartKeyInfo)
event.getOldValue();
+ OmMultipartKeyInfo newMultipartKeyInfo = (OmMultipartKeyInfo)
event.getValue();
+
+ // Calculate old sizes
+ for (PartKeyInfo partKeyInfo : oldMultipartKeyInfo.getPartKeyInfoMap()) {
+ OmKeyInfo omKeyInfo =
OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+
unReplicatedSizeMap.computeIfPresent(getUnReplicatedSizeKeyFromTable(tableName),
+ (k, size) -> size - omKeyInfo.getDataSize());
+
replicatedSizeMap.computeIfPresent(getReplicatedSizeKeyFromTable(tableName),
+ (k, size) -> size - omKeyInfo.getReplicatedSize());
+ }
+
+ // Calculate new sizes
+ for (PartKeyInfo partKeyInfo : newMultipartKeyInfo.getPartKeyInfoMap()) {
+ OmKeyInfo omKeyInfo =
OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+
unReplicatedSizeMap.computeIfPresent(getUnReplicatedSizeKeyFromTable(tableName),
+ (k, size) -> size + omKeyInfo.getDataSize());
+
replicatedSizeMap.computeIfPresent(getReplicatedSizeKeyFromTable(tableName),
+ (k, size) -> size + omKeyInfo.getReplicatedSize());
+ }
+ } else {
+ LOG.warn("Update event does not have the Multipart Key Info for {}.",
event.getKey());
+ }
+ }
+
+ /**
+ * This method is called by the reprocess method. It calculates the record
+ * counts for the multipart info table. Additionally, it computes the sizes
+ * of both replicated and unreplicated parts that are currently in multipart
+ * uploads in the backend.
+ */
+ @Override
+ public Triple<Long, Long, Long> getTableSizeAndCount(
+ TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator) {
+ long count = 0;
+ long unReplicatedSize = 0;
+ long replicatedSize = 0;
+
+ if (iterator != null) {
+ while (iterator.hasNext()) {
+ Table.KeyValue<String, ?> kv = iterator.next();
+ if (kv != null && kv.getValue() != null) {
+ OmMultipartKeyInfo multipartKeyInfo = (OmMultipartKeyInfo)
kv.getValue();
+ for (PartKeyInfo partKeyInfo : multipartKeyInfo.getPartKeyInfoMap())
{
+ OmKeyInfo omKeyInfo =
OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+ unReplicatedSize += omKeyInfo.getDataSize();
+ replicatedSize += omKeyInfo.getReplicatedSize();
+ }
+ count++;
+ }
+ }
+ }
+ return Triple.of(count, unReplicatedSize, replicatedSize);
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
index 0ea225e12fb..341912b5d2e 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.recon.tasks;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_TABLE;
+import static
org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_INFO_TABLE;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_FILE_TABLE;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_KEY_TABLE;
@@ -70,6 +71,7 @@ public OmTableInsightTask(ReconGlobalStatsManager
reconGlobalStatsManager,
tableHandlers.put(OPEN_KEY_TABLE, new OpenKeysInsightHandler());
tableHandlers.put(OPEN_FILE_TABLE, new OpenKeysInsightHandler());
tableHandlers.put(DELETED_TABLE, new DeletedKeysInsightHandler());
+ tableHandlers.put(MULTIPART_INFO_TABLE, new MultipartInfoInsightHandler());
}
@Override
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
index 659ece4b398..faa158bfab3 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
@@ -21,6 +21,7 @@
import static
org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_DIR_TABLE;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_TABLE;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE;
+import static
org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_INFO_TABLE;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_FILE_TABLE;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_KEY_TABLE;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.VOLUME_TABLE;
@@ -47,7 +48,9 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.utils.db.Table;
@@ -57,14 +60,18 @@
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
import org.apache.hadoop.ozone.recon.ReconTestInjector;
import org.apache.hadoop.ozone.recon.api.types.NSSummary;
import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconGlobalStatsManager;
import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl;
+import org.apache.hadoop.util.Time;
import org.jooq.DSLContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -357,6 +364,16 @@ public void testReprocessForCount() throws Exception {
when(keyInfo.getOmKeyInfoList()).thenReturn(
Arrays.asList(mock(OmKeyInfo.class)));
when(mockKeyValue.getValue()).thenReturn(keyInfo);
+ } else if (tableName.equals(MULTIPART_INFO_TABLE)) {
+ String uploadID = UUID.randomUUID().toString();
+ OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo.Builder()
+ .setUploadID(uploadID)
+ .build();
+ PartKeyInfo partKeyInfo =
+ createPartKeyInfo(UUID.randomUUID().toString(),
UUID.randomUUID().toString(), UUID.randomUUID().toString(),
+ uploadID, 1, 100L);
+ multipartKeyInfo.addPartKeyInfo(partKeyInfo);
+ when(mockKeyValue.getValue()).thenReturn(multipartKeyInfo);
} else {
when(mockKeyValue.getValue()).thenReturn(mock(OmKeyInfo.class));
}
@@ -373,6 +390,7 @@ public void testReprocessForCount() throws Exception {
assertEquals(5L, getCountForTable(BUCKET_TABLE));
assertEquals(5L, getCountForTable(OPEN_KEY_TABLE));
assertEquals(5L, getCountForTable(DELETED_TABLE));
+ assertEquals(5L, getCountForTable(MULTIPART_INFO_TABLE));
}
@Test
@@ -443,8 +461,8 @@ public void testProcessForCount() {
// Creating events for each table except the deleted table
for (String tableName : omTableInsightTask.getTaskTables()) {
- if (tableName.equals(DELETED_TABLE)) {
- continue; // Skipping deleted table as it has a separate test
+ if (tableName.equals(DELETED_TABLE) ||
tableName.equals(MULTIPART_INFO_TABLE)) {
+ continue; // Skipping deleted and multipartInfo tables as they have
separate tests
}
// Adding 5 PUT events per table
@@ -471,7 +489,7 @@ public void testProcessForCount() {
// Verifying the count in each table
for (String tableName : omTableInsightTask.getTaskTables()) {
- if (tableName.equals(DELETED_TABLE)) {
+ if (tableName.equals(DELETED_TABLE) ||
tableName.equals(MULTIPART_INFO_TABLE)) {
continue;
}
assertEquals(4L, getCountForTable(
@@ -481,7 +499,7 @@ public void testProcessForCount() {
List<OMDBUpdateEvent> additionalEvents = new ArrayList<>();
// Simulating new PUT and DELETE events
for (String tableName : omTableInsightTask.getTaskTables()) {
- if (tableName.equals(DELETED_TABLE)) {
+ if (tableName.equals(DELETED_TABLE) ||
tableName.equals(MULTIPART_INFO_TABLE)) {
continue;
}
// Adding 1 new PUT event
@@ -499,7 +517,7 @@ public void testProcessForCount() {
omTableInsightTask.process(additionalBatch, Collections.emptyMap());
// Verifying the final count in each table
for (String tableName : omTableInsightTask.getTaskTables()) {
- if (tableName.equals(DELETED_TABLE)) {
+ if (tableName.equals(DELETED_TABLE) ||
tableName.equals(MULTIPART_INFO_TABLE)) {
continue;
}
// 5 items expected after processing the additional events.
@@ -627,6 +645,137 @@ public void testProcessForDeletedTable() {
assertEquals(12000L, getReplicatedSizeForTable(DELETED_TABLE));
}
+ @Test
+ public void testProcessForMultipartInfoTable() {
+ // Prepare 5 MPU key PUT events.
+ ArrayList<OMDBUpdateEvent> putEvents = new ArrayList<>();
+ String[] multipartKeys = new String[5];
+ OmMultipartKeyInfo[] mpuInfos = new OmMultipartKeyInfo[5];
+ String uploadID = UUID.randomUUID().toString();
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ for (int i = 0; i < 5; i++) {
+ OmMultipartKeyInfo mpu = new OmMultipartKeyInfo.Builder()
+ .setObjectID(i + 1)
+ .setUploadID(uploadID)
+ .setCreationTime(Time.now())
+ .setReplicationConfig(RatisReplicationConfig.getInstance(
+ HddsProtos.ReplicationFactor.THREE))
+ .build();
+
+ // Each MPU has 2 parts, each part is 100 bytes.
+ mpu.addPartKeyInfo(createPartKeyInfo(volumeName, bucketName, keyName,
uploadID, 1, 100L));
+ mpu.addPartKeyInfo(createPartKeyInfo(volumeName, bucketName, keyName,
uploadID, 2, 100L));
+ String multipartKey = reconOMMetadataManager.getMultipartKey(volumeName,
bucketName, keyName, uploadID);
+ multipartKeys[i] = multipartKey;
+ mpuInfos[i] = mpu;
+ putEvents.add(getOMUpdateEvent(multipartKey, mpu, MULTIPART_INFO_TABLE,
PUT, null));
+ }
+ OMUpdateEventBatch putBatch = new OMUpdateEventBatch(putEvents, 0L);
+ omTableInsightTask.process(putBatch, Collections.emptyMap());
+
+ // After 5 MPU key PUTs, each with 2 parts of 100 bytes, total
unreplicated size = 5 * 2 * 100 bytes = 1000 bytes.
+ // Replicated size (with RATIS THREE replication) = 1000 bytes * 3 = 3000
bytes.
+ assertEquals(5L, getCountForTable(MULTIPART_INFO_TABLE));
+ assertEquals(1000L, getUnReplicatedSizeForTable(MULTIPART_INFO_TABLE));
+ assertEquals(3000L, getReplicatedSizeForTable(MULTIPART_INFO_TABLE));
+
+ // DELETE the last MPU key.
+ ArrayList<OMDBUpdateEvent> deleteEvents = new ArrayList<>();
+ deleteEvents.add(getOMUpdateEvent(multipartKeys[4], mpuInfos[4],
MULTIPART_INFO_TABLE, DELETE, null));
+ OMUpdateEventBatch deleteBatch = new OMUpdateEventBatch(deleteEvents, 0L);
+ omTableInsightTask.process(deleteBatch, Collections.emptyMap());
+
+ // After DELETE: 4 MPU keys left, 4 * 2 * 100 = 800 bytes unreplicated
size, 800 bytes * 3 = 2400 bytes
+ // replicated size.
+ assertEquals(4L, getCountForTable(MULTIPART_INFO_TABLE));
+ assertEquals(800L, getUnReplicatedSizeForTable(MULTIPART_INFO_TABLE));
+ assertEquals(2400L, getReplicatedSizeForTable(MULTIPART_INFO_TABLE));
+
+ // UPDATE the first MPU key: change part 1 to 200 bytes, part 2 stays 100
bytes.
+ OmMultipartKeyInfo newMpu = new OmMultipartKeyInfo.Builder()
+ .setObjectID(1L)
+ .setUploadID(uploadID)
+ .setCreationTime(Time.now())
+ .setReplicationConfig(RatisReplicationConfig.getInstance(
+ HddsProtos.ReplicationFactor.THREE))
+ .build();
+
+ newMpu.addPartKeyInfo(createPartKeyInfo(volumeName, bucketName, keyName,
uploadID, 1, 200L));
+ newMpu.addPartKeyInfo(createPartKeyInfo(volumeName, bucketName, keyName,
uploadID, 2, 100L));
+
+ ArrayList<OMDBUpdateEvent> updateEvents = new ArrayList<>();
+ updateEvents.add(getOMUpdateEvent(multipartKeys[0], newMpu,
MULTIPART_INFO_TABLE, UPDATE, mpuInfos[0]));
+ OMUpdateEventBatch updateBatch = new OMUpdateEventBatch(updateEvents, 0L);
+ omTableInsightTask.process(updateBatch, Collections.emptyMap());
+
+ // After UPDATE: 3 MPU keys unchanged (2*100 bytes each), 1 MPU with
200+100 bytes.
+ // Total unreplicated size = 3*2*100 + 200+100 = 600+300 = 900 bytes.
+ // Total replicated size (with RATIS THREE replication) = 900 * 3 = 2700
bytes.
+ assertEquals(4L, getCountForTable(MULTIPART_INFO_TABLE));
+ assertEquals(900L, getUnReplicatedSizeForTable(MULTIPART_INFO_TABLE));
+ assertEquals(2700L, getReplicatedSizeForTable(MULTIPART_INFO_TABLE));
+ }
+
+ @Test
+ public void testReprocessForMultipartInfoTable() throws Exception {
+ String uploadID = UUID.randomUUID().toString();
+ OmMultipartKeyInfo omMultipartKeyInfo = new OmMultipartKeyInfo.Builder()
+ .setObjectID(1L)
+ .setUploadID(uploadID)
+ .setCreationTime(Time.now())
+ .setReplicationConfig(RatisReplicationConfig.getInstance(
+ HddsProtos.ReplicationFactor.THREE))
+ .build();
+
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ PartKeyInfo part1 = createPartKeyInfo(volumeName, bucketName, keyName,
uploadID, 1, 100L);
+ omMultipartKeyInfo.addPartKeyInfo(part1);
+
+ PartKeyInfo part2 = createPartKeyInfo(volumeName, bucketName, keyName,
uploadID, 2, 100L);
+ omMultipartKeyInfo.addPartKeyInfo(part2);
+
+ PartKeyInfo part3 = createPartKeyInfo(volumeName, bucketName, keyName,
uploadID, 3, 100L);
+ omMultipartKeyInfo.addPartKeyInfo(part3);
+
+ String multipartKey = reconOMMetadataManager.getMultipartKey(volumeName,
bucketName, keyName, uploadID);
+ reconOMMetadataManager.getMultipartInfoTable().put(multipartKey,
omMultipartKeyInfo);
+
+ ReconOmTask.TaskResult result =
omTableInsightTask.reprocess(reconOMMetadataManager);
+ assertTrue(result.isTaskSuccess());
+
+ assertEquals(1L, getCountForTable(MULTIPART_INFO_TABLE));
+ // each MPU part size is 100 bytes * 3 MPU parts = 300 bytes.
+ assertEquals(300L, getUnReplicatedSizeForTable(MULTIPART_INFO_TABLE));
+ // each MPU part is replicated using RATIS THREE, total replicated size =
300 bytes * 3 = 900 bytes.
+ assertEquals(900L, getReplicatedSizeForTable(MULTIPART_INFO_TABLE));
+ }
+
+ public PartKeyInfo createPartKeyInfo(String volumeName, String bucketName,
+ String keyName, String uploadID, int
partNumber, long dataSize) {
+ return PartKeyInfo.newBuilder()
+ .setPartNumber(partNumber)
+ .setPartName(reconOMMetadataManager.getMultipartKey(volumeName,
+ bucketName, keyName, uploadID))
+ .setPartKeyInfo(KeyInfo.newBuilder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .setDataSize(dataSize)
+ .setCreationTime(Time.now())
+ .setModificationTime(Time.now())
+ .setObjectID(UUID.randomUUID().hashCode())
+ .setType(HddsProtos.ReplicationType.RATIS)
+ .setFactor(HddsProtos.ReplicationFactor.THREE)
+ .build())
+ .build();
+ }
+
private OMDBUpdateEvent getOMUpdateEvent(
String name, Object value,
String table,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]