This is an automated email from the ASF dual-hosted git repository.
devesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e0bd2cc716 HDDS-12185. Enhance FileSizeCountTask for Faster
Processing. (#7796)
e0bd2cc716 is described below
commit e0bd2cc71621087e41cbc0b79f72b6487d271264
Author: Arafat2198 <[email protected]>
AuthorDate: Fri Feb 21 11:17:29 2025 +0530
HDDS-12185. Enhance FileSizeCountTask for Faster Processing. (#7796)
---
.../apache/hadoop/ozone/recon/ReconConstants.java | 13 +
.../hadoop/ozone/recon/ReconControllerModule.java | 14 +-
.../ozone/recon/tasks/FileSizeCountTaskFSO.java | 70 +++++
.../ozone/recon/tasks/FileSizeCountTaskHelper.java | 331 +++++++++++++++++++++
.../ozone/recon/tasks/FileSizeCountTaskOBS.java | 70 +++++
.../ozone/recon/tasks/ReconTaskControllerImpl.java | 3 +
.../hadoop/ozone/recon/api/TestEndpoints.java | 64 +++-
.../ozone/recon/tasks/TestFileSizeCountTask.java | 258 +++++++++-------
8 files changed, 695 insertions(+), 128 deletions(-)
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
index 22b9d969bc..ecd88f8099 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
@@ -17,6 +17,8 @@
package org.apache.hadoop.ozone.recon;
+import java.util.concurrent.atomic.AtomicBoolean;
+
/**
* Recon Server constants file.
*/
@@ -89,4 +91,15 @@ private ReconConstants() {
(double) MAX_CONTAINER_SIZE_UPPER_BOUND /
MIN_CONTAINER_SIZE_UPPER_BOUND) /
Math.log(2)) + 1;
+
+ // For file-size count reprocessing: ensure only one task truncates the
table.
+ public static final AtomicBoolean FILE_SIZE_COUNT_TABLE_TRUNCATED = new
AtomicBoolean(false);
+
+ /**
+ * Resets the table-truncated flag for the given tables. This should be
called once per reprocess cycle,
+ * for example by the OM task controller, before the tasks run.
+ */
+ public static void resetTableTruncatedFlags() {
+ FILE_SIZE_COUNT_TABLE_TRUNCATED.set(false);
+ }
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
index 42ab9a4716..127cbfcf9e 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
@@ -30,6 +30,8 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
@@ -57,7 +59,8 @@
import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl;
import
org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
-import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
+import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskFSO;
+import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskOBS;
import org.apache.hadoop.ozone.recon.tasks.NSSummaryTask;
import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask;
import org.apache.hadoop.ozone.recon.tasks.ReconOmTask;
@@ -129,12 +132,19 @@ protected void configure() {
Multibinder<ReconOmTask> taskBinder =
Multibinder.newSetBinder(binder(), ReconOmTask.class);
taskBinder.addBinding().to(ContainerKeyMapperTask.class);
- taskBinder.addBinding().to(FileSizeCountTask.class);
+ taskBinder.addBinding().to(FileSizeCountTaskFSO.class);
+ taskBinder.addBinding().to(FileSizeCountTaskOBS.class);
taskBinder.addBinding().to(OmTableInsightTask.class);
taskBinder.addBinding().to(NSSummaryTask.class);
}
}
+ @Provides
+ @Singleton
+ public ExecutorService provideReconExecutorService() {
+ return Executors.newFixedThreadPool(5);
+ }
+
/**
* Class that has all the DAO bindings in Recon.
*/
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java
new file mode 100644
index 0000000000..f40a859590
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import com.google.inject.Inject;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
+import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
+import org.jooq.DSLContext;
+
+/**
+ * Task for FileSystemOptimized (FSO) which processes the FILE_TABLE.
+ */
+public class FileSizeCountTaskFSO implements ReconOmTask {
+
+ private final FileCountBySizeDao fileCountBySizeDao;
+ private final DSLContext dslContext;
+
+ @Inject
+ public FileSizeCountTaskFSO(FileCountBySizeDao fileCountBySizeDao,
+ UtilizationSchemaDefinition
utilizationSchemaDefinition) {
+ this.fileCountBySizeDao = fileCountBySizeDao;
+ this.dslContext = utilizationSchemaDefinition.getDSLContext();
+ }
+
+ @Override
+ public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+ return FileSizeCountTaskHelper.reprocess(
+ omMetadataManager,
+ dslContext,
+ fileCountBySizeDao,
+ BucketLayout.FILE_SYSTEM_OPTIMIZED,
+ getTaskName()
+ );
+ }
+
+ @Override
+ public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+ // This task listens only on the FILE_TABLE.
+ return FileSizeCountTaskHelper.processEvents(
+ events,
+ OmMetadataManagerImpl.FILE_TABLE,
+ dslContext,
+ fileCountBySizeDao,
+ getTaskName());
+ }
+
+ @Override
+ public String getTaskName() {
+ return "FileSizeCountTaskFSO";
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
new file mode 100644
index 0000000000..489449d6a9
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import static
org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.recon.ReconConstants;
+import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
+import org.jooq.DSLContext;
+import org.jooq.Record3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class that encapsulates the common code for file size count tasks.
+ */
+public abstract class FileSizeCountTaskHelper {
+ protected static final Logger LOG =
LoggerFactory.getLogger(FileSizeCountTaskHelper.class);
+
+ // Static lock to guard table truncation.
+ private static final Object TRUNCATE_LOCK = new Object();
+
+ /**
+ * Truncates the FILE_COUNT_BY_SIZE table if it has not been truncated yet.
+ * This method synchronizes on a static lock to ensure only one task
truncates at a time.
+ * If an error occurs, the flag is reset to allow retrying the truncation.
+ *
+ * @param dslContext DSLContext for executing DB commands.
+ */
+ public static void truncateTableIfNeeded(DSLContext dslContext) {
+ synchronized (TRUNCATE_LOCK) {
+ if (ReconConstants.FILE_SIZE_COUNT_TABLE_TRUNCATED.compareAndSet(false,
true)) {
+ try {
+ int execute = dslContext.delete(FILE_COUNT_BY_SIZE).execute();
+ LOG.info("Deleted {} records from {}", execute, FILE_COUNT_BY_SIZE);
+ } catch (Exception e) {
+ // Reset the flag so that truncation can be retried
+ ReconConstants.FILE_SIZE_COUNT_TABLE_TRUNCATED.set(false);
+ LOG.error("Error while truncating FILE_COUNT_BY_SIZE table,
resetting flag.", e);
+ throw new RuntimeException("Table truncation failed", e); //
Propagate upwards
+ }
+ } else {
+ LOG.info("Table already truncated by another task; waiting for
truncation to complete.");
+ }
+ }
+ }
+
+ /**
+ * Executes the reprocess method for the given task.
+ *
+ * @param omMetadataManager OM metadata manager.
+ * @param dslContext DSLContext for DB operations.
+ * @param fileCountBySizeDao DAO for file count table.
+ * @param bucketLayout The bucket layout to process.
+ * @param taskName The name of the task for logging.
+ * @return A Pair of task name and boolean indicating success.
+ */
+ public static Pair<String, Boolean> reprocess(OMMetadataManager
omMetadataManager,
+ DSLContext dslContext,
+ FileCountBySizeDao
fileCountBySizeDao,
+ BucketLayout bucketLayout,
+ String taskName) {
+ LOG.info("Starting Reprocess for {}", taskName);
+ Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
+ long startTime = System.currentTimeMillis();
+ truncateTableIfNeeded(dslContext);
+ boolean status = reprocessBucketLayout(
+ bucketLayout, omMetadataManager, fileSizeCountMap, dslContext,
fileCountBySizeDao, taskName);
+ if (!status) {
+ return new ImmutablePair<>(taskName, false);
+ }
+ writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao);
+ long endTime = System.currentTimeMillis();
+ LOG.info("{} completed Reprocess in {} ms.", taskName, (endTime -
startTime));
+ return new ImmutablePair<>(taskName, true);
+ }
+
+ /**
+ * Iterates over the OM DB keys for the given bucket layout and updates the
fileSizeCountMap.
+ *
+ * @param bucketLayout The bucket layout to use.
+ * @param omMetadataManager OM metadata manager.
+ * @param fileSizeCountMap Map accumulating file size counts.
+ * @param dslContext DSLContext for DB operations.
+ * @param fileCountBySizeDao DAO for file count table.
+ * @param taskName The name of the task for logging.
+ * @return true if processing succeeds, false otherwise.
+ */
+ public static boolean reprocessBucketLayout(BucketLayout bucketLayout,
+ OMMetadataManager
omMetadataManager,
+ Map<FileSizeCountKey, Long>
fileSizeCountMap,
+ DSLContext dslContext,
+ FileCountBySizeDao
fileCountBySizeDao,
+ String taskName) {
+ Table<String, OmKeyInfo> omKeyInfoTable =
omMetadataManager.getKeyTable(bucketLayout);
+ int totalKeysProcessed = 0;
+ try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter =
+ omKeyInfoTable.iterator()) {
+ while (keyIter.hasNext()) {
+ Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
+ handlePutKeyEvent(kv.getValue(), fileSizeCountMap);
+ totalKeysProcessed++;
+
+ // Flush to DB periodically.
+ if (fileSizeCountMap.size() >= 100000) {
+ writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao);
+ fileSizeCountMap.clear();
+ }
+ }
+ } catch (IOException ioEx) {
+ LOG.error("Unable to populate File Size Count for {} in Recon DB.",
taskName, ioEx);
+ return false;
+ }
+ LOG.info("Reprocessed {} keys for bucket layout {}.", totalKeysProcessed,
bucketLayout);
+ return true;
+ }
+
+ /**
+ * Processes a batch of OM update events.
+ *
+ * @param events OM update event batch.
+ * @param tableName The bucket layout for which either keyTable or
fileTable is fetched
+ * @param dslContext DSLContext for DB operations.
+ * @param fileCountBySizeDao DAO for file count table.
+ * @param taskName The name of the task for logging.
+ * @return A Pair of task name and boolean indicating success.
+ */
+ public static Pair<String, Boolean> processEvents(OMUpdateEventBatch events,
+ String tableName,
+ DSLContext dslContext,
+ FileCountBySizeDao
fileCountBySizeDao,
+ String taskName) {
+ Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
+ Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
+ long startTime = System.currentTimeMillis();
+ while (eventIterator.hasNext()) {
+ OMDBUpdateEvent<String, Object> omdbUpdateEvent = eventIterator.next();
+ if (!tableName.equals(omdbUpdateEvent.getTable())) {
+ continue;
+ }
+ String updatedKey = omdbUpdateEvent.getKey();
+ Object value = omdbUpdateEvent.getValue();
+ Object oldValue = omdbUpdateEvent.getOldValue();
+ if (value instanceof OmKeyInfo) {
+ OmKeyInfo omKeyInfo = (OmKeyInfo) value;
+ OmKeyInfo omKeyInfoOld = (OmKeyInfo) oldValue;
+ try {
+ switch (omdbUpdateEvent.getAction()) {
+ case PUT:
+ handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
+ break;
+ case DELETE:
+ handleDeleteKeyEvent(updatedKey, omKeyInfo, fileSizeCountMap);
+ break;
+ case UPDATE:
+ if (omKeyInfoOld != null) {
+ handleDeleteKeyEvent(updatedKey, omKeyInfoOld, fileSizeCountMap);
+ handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
+ } else {
+ LOG.warn("Update event does not have the old keyInfo for {}.",
updatedKey);
+ }
+ break;
+ default:
+ LOG.trace("Skipping DB update event: {}",
omdbUpdateEvent.getAction());
+ }
+ } catch (Exception e) {
+ LOG.error("Unexpected exception while processing key {}.",
updatedKey, e);
+ return new ImmutablePair<>(taskName, false);
+ }
+ } else {
+ LOG.warn("Unexpected value type {} for key {}. Skipping processing.",
+ value.getClass().getName(), updatedKey);
+ }
+ }
+ writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao);
+ LOG.debug("{} successfully processed in {} milliseconds", taskName,
+ (System.currentTimeMillis() - startTime));
+ return new ImmutablePair<>(taskName, true);
+ }
+
+ /**
+ * Writes the accumulated file size counts to the DB.
+ *
+ * @param fileSizeCountMap Map of file size counts.
+ * @param dslContext DSLContext for DB operations.
+ * @param fileCountBySizeDao DAO for file count table.
+ */
+ public static void writeCountsToDB(Map<FileSizeCountKey, Long>
fileSizeCountMap,
+ DSLContext dslContext,
+ FileCountBySizeDao fileCountBySizeDao) {
+
+ List<FileCountBySize> insertToDb = new ArrayList<>();
+ List<FileCountBySize> updateInDb = new ArrayList<>();
+ boolean isDbTruncated = isFileCountBySizeTableEmpty(dslContext); // Check
if table is empty
+
+ fileSizeCountMap.keySet().forEach((FileSizeCountKey key) -> {
+ FileCountBySize newRecord = new FileCountBySize();
+ newRecord.setVolume(key.volume);
+ newRecord.setBucket(key.bucket);
+ newRecord.setFileSize(key.fileSizeUpperBound);
+ newRecord.setCount(fileSizeCountMap.get(key));
+ if (!isDbTruncated) {
+ // Get the current count from database and update
+ Record3<String, String, Long> recordToFind =
+ dslContext.newRecord(
+ FILE_COUNT_BY_SIZE.VOLUME,
+ FILE_COUNT_BY_SIZE.BUCKET,
+ FILE_COUNT_BY_SIZE.FILE_SIZE)
+ .value1(key.volume)
+ .value2(key.bucket)
+ .value3(key.fileSizeUpperBound);
+ FileCountBySize fileCountRecord =
+ fileCountBySizeDao.findById(recordToFind);
+ if (fileCountRecord == null && newRecord.getCount() > 0L) {
+ // insert new row only for non-zero counts.
+ insertToDb.add(newRecord);
+ } else if (fileCountRecord != null) {
+ newRecord.setCount(fileCountRecord.getCount() +
+ fileSizeCountMap.get(key));
+ updateInDb.add(newRecord);
+ }
+ } else if (newRecord.getCount() > 0) {
+ // insert new row only for non-zero counts.
+ insertToDb.add(newRecord);
+ }
+ });
+ fileCountBySizeDao.insert(insertToDb);
+ fileCountBySizeDao.update(updateInDb);
+ }
+
+ /**
+ * Increments the count for a given key on a PUT event.
+ */
+ public static void handlePutKeyEvent(OmKeyInfo omKeyInfo,
+ Map<FileSizeCountKey, Long>
fileSizeCountMap) {
+ FileSizeCountKey key = getFileSizeCountKey(omKeyInfo);
+ Long count = fileSizeCountMap.containsKey(key) ? fileSizeCountMap.get(key)
+ 1L : 1L;
+ fileSizeCountMap.put(key, count);
+ }
+
+ /**
+ * Decrements the count for a given key on a DELETE event.
+ */
+ public static void handleDeleteKeyEvent(String key, OmKeyInfo omKeyInfo,
+ Map<FileSizeCountKey, Long>
fileSizeCountMap) {
+ if (omKeyInfo == null) {
+ LOG.warn("Deleting a key not found while handling DELETE key event. Key
not found in Recon OM DB: {}", key);
+ } else {
+ FileSizeCountKey countKey = getFileSizeCountKey(omKeyInfo);
+ Long count = fileSizeCountMap.containsKey(countKey) ?
fileSizeCountMap.get(countKey) - 1L : -1L;
+ fileSizeCountMap.put(countKey, count);
+ }
+ }
+
+ /**
+ * Returns a FileSizeCountKey for the given OmKeyInfo.
+ */
+ public static FileSizeCountKey getFileSizeCountKey(OmKeyInfo omKeyInfo) {
+ return new FileSizeCountKey(omKeyInfo.getVolumeName(),
+ omKeyInfo.getBucketName(),
+ ReconUtils.getFileSizeUpperBound(omKeyInfo.getDataSize()));
+ }
+
+ /**
+ * Checks if the FILE_COUNT_BY_SIZE table is empty.
+ */
+ public static boolean isFileCountBySizeTableEmpty(DSLContext dslContext) {
+ return dslContext.fetchCount(FILE_COUNT_BY_SIZE) == 0;
+ }
+
+ /**
+ * Helper key class used for grouping file size counts.
+ */
+ public static class FileSizeCountKey {
+ private final String volume;
+ private final String bucket;
+ private final Long fileSizeUpperBound;
+
+ public FileSizeCountKey(String volume, String bucket, Long
fileSizeUpperBound) {
+ this.volume = volume;
+ this.bucket = bucket;
+ this.fileSizeUpperBound = fileSizeUpperBound;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof FileSizeCountKey) {
+ FileSizeCountKey other = (FileSizeCountKey) obj;
+ return volume.equals(other.volume) &&
+ bucket.equals(other.bucket) &&
+ fileSizeUpperBound.equals(other.fileSizeUpperBound);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return (volume + bucket + fileSizeUpperBound).hashCode();
+ }
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java
new file mode 100644
index 0000000000..acaab763ac
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import com.google.inject.Inject;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
+import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
+import org.jooq.DSLContext;
+
+/**
+ * Task for ObjectStore (OBS) which processes the KEY_TABLE.
+ */
+public class FileSizeCountTaskOBS implements ReconOmTask {
+
+ private final FileCountBySizeDao fileCountBySizeDao;
+ private final DSLContext dslContext;
+
+ @Inject
+ public FileSizeCountTaskOBS(FileCountBySizeDao fileCountBySizeDao,
+ UtilizationSchemaDefinition
utilizationSchemaDefinition) {
+ this.fileCountBySizeDao = fileCountBySizeDao;
+ this.dslContext = utilizationSchemaDefinition.getDSLContext();
+ }
+
+ @Override
+ public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+ return FileSizeCountTaskHelper.reprocess(
+ omMetadataManager,
+ dslContext,
+ fileCountBySizeDao,
+ BucketLayout.OBJECT_STORE,
+ getTaskName()
+ );
+ }
+
+ @Override
+ public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+ // This task listens only on the KEY_TABLE.
+ return FileSizeCountTaskHelper.processEvents(
+ events,
+ OmMetadataManagerImpl.KEY_TABLE,
+ dslContext,
+ fileCountBySizeDao,
+ getTaskName());
+ }
+
+ @Override
+ public String getTaskName() {
+ return "FileSizeCountTaskOBS";
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
index c2a967415c..c1d786db1a 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
@@ -39,6 +39,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.recon.ReconConstants;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.tasks.types.NamedCallableTask;
import org.apache.hadoop.ozone.recon.tasks.types.TaskExecutionException;
@@ -122,6 +123,7 @@ public synchronized void consumeOMEvents(OMUpdateEventBatch
events, OMMetadataMa
}
// Reprocess the failed tasks.
+ ReconConstants.resetTableTruncatedFlags();
if (!retryFailedTasks.isEmpty()) {
tasks.clear();
for (String taskName : failedTasks) {
@@ -154,6 +156,7 @@ private void ignoreFailedTasks(List<String> failedTasks) {
@Override
public synchronized void reInitializeTasks(ReconOMMetadataManager
omMetadataManager) {
Collection<NamedCallableTask<Pair<String, Boolean>>> tasks = new
ArrayList<>();
+ ReconConstants.resetTableTruncatedFlags();
for (Map.Entry<String, ReconOmTask> taskEntry :
reconOmTasks.entrySet()) {
ReconOmTask task = taskEntry.getValue();
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
index 5a667bd7c7..52ac1d64f4 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
@@ -27,6 +27,7 @@
import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDeletedKeysToOm;
import static
org.apache.hadoop.ozone.recon.spi.impl.PrometheusServiceProviderImpl.PROMETHEUS_INSTANT_QUERY_API;
import static org.assertj.core.api.Assertions.assertThat;
+import static
org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
import static
org.hadoop.ozone.recon.schema.tables.GlobalStatsTable.GLOBAL_STATS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -131,7 +132,8 @@
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
import
org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
import org.apache.hadoop.ozone.recon.tasks.ContainerSizeCountTask;
-import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
+import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskFSO;
+import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskOBS;
import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.apache.ozone.test.LambdaTestUtils;
@@ -143,6 +145,7 @@
import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
import org.jooq.Configuration;
import org.jooq.DSLContext;
+import org.jooq.Record3;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -162,7 +165,8 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
private VolumeEndpoint volumeEndpoint;
private BucketEndpoint bucketEndpoint;
private ReconOMMetadataManager reconOMMetadataManager;
- private FileSizeCountTask fileSizeCountTask;
+ private FileSizeCountTaskFSO fileSizeCountTaskFSO;
+ private FileSizeCountTaskOBS fileSizeCountTaskOBS;
private ContainerSizeCountTask containerSizeCountTask;
private OmTableInsightTask omTableInsightTask;
private ReconStorageContainerManagerFacade reconScm;
@@ -305,8 +309,10 @@ private void initializeInjector() throws Exception {
fileCountBySizeDao,
containerCountBySizeDao,
utilizationSchemaDefinition);
- fileSizeCountTask =
- new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition);
+ fileSizeCountTaskFSO =
+ new FileSizeCountTaskFSO(fileCountBySizeDao,
utilizationSchemaDefinition);
+ fileSizeCountTaskOBS =
+ new FileSizeCountTaskOBS(fileCountBySizeDao,
utilizationSchemaDefinition);
omTableInsightTask =
new OmTableInsightTask(globalStatsDao, sqlConfiguration,
reconOMMetadataManager);
@@ -835,7 +841,7 @@ public void testGetFileCounts() throws Exception {
when(keyTableLegacy.iterator()).thenReturn(mockKeyIterLegacy);
when(keyTableFso.iterator()).thenReturn(mockKeyIterFso);
- when(omMetadataManager.getKeyTable(BucketLayout.LEGACY)).thenReturn(
+ when(omMetadataManager.getKeyTable(BucketLayout.OBJECT_STORE)).thenReturn(
keyTableLegacy);
when(omMetadataManager.getKeyTable(
BucketLayout.FILE_SYSTEM_OPTIMIZED)).thenReturn(keyTableFso);
@@ -862,11 +868,37 @@ public void testGetFileCounts() throws Exception {
.thenReturn(omKeyInfo2)
.thenReturn(omKeyInfo3);
- Pair<String, Boolean> result =
- fileSizeCountTask.reprocess(omMetadataManager);
- assertTrue(result.getRight());
+ // Call reprocess on both endpoints.
+ Pair<String, Boolean> resultOBS =
fileSizeCountTaskOBS.reprocess(omMetadataManager);
+ Pair<String, Boolean> resultFSO =
fileSizeCountTaskFSO.reprocess(omMetadataManager);
+ assertTrue(resultOBS.getRight());
+ assertTrue(resultFSO.getRight());
+ // The two tasks should result in 3 rows.
assertEquals(3, fileCountBySizeDao.count());
+
+ // Verify counts:
+ // For vol1/bucket1 with fileSize 1000L, the upper bound is 1024L and
expected count is 2.
+ Record3<String, String, Long> recordToFind = dslContext.newRecord(
+ FILE_COUNT_BY_SIZE.VOLUME,
+ FILE_COUNT_BY_SIZE.BUCKET,
+ FILE_COUNT_BY_SIZE.FILE_SIZE)
+ .value1("vol1")
+ .value2("bucket1")
+ .value3(1024L);
+ assertEquals(2L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+
+ // For vol1/bucket1 with fileSize 100000L, the upper bound is 131072L and
expected count is 2.
+ recordToFind.value3(131072L);
+ assertEquals(2L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+
+ // For vol2/bucket1 with fileSize 1000L, the upper bound is 1024L and
expected count is 2.
+ recordToFind.value1("vol2");
+ recordToFind.value2("bucket1");
+ recordToFind.value3(1024L);
+ assertEquals(2L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+
+ // --- Now test the query endpoints of the utilization service ---
Response response = utilizationEndpoint.getFileCounts(null, null, 0);
List<FileCountBySize> resultSet =
(List<FileCountBySize>) response.getEntity();
@@ -881,38 +913,38 @@ public void testGetFileCounts() throws Exception {
o.getBucket().equals("bucket1") && o.getFileSize() == 1024L &&
o.getCount() == 2L));
- // Test for "volume" query param
+ // Test for "volume" query param.
response = utilizationEndpoint.getFileCounts("vol1", null, 0);
resultSet = (List<FileCountBySize>) response.getEntity();
assertEquals(2, resultSet.size());
assertTrue(resultSet.stream().allMatch(o -> o.getVolume().equals("vol1")));
- // Test for non-existent volume
+ // Test for non-existent volume.
response = utilizationEndpoint.getFileCounts("vol", null, 0);
resultSet = (List<FileCountBySize>) response.getEntity();
assertEquals(0, resultSet.size());
- // Test for "volume" + "bucket" query param
+ // Test for "volume" + "bucket" query param.
response = utilizationEndpoint.getFileCounts("vol1", "bucket1", 0);
resultSet = (List<FileCountBySize>) response.getEntity();
assertEquals(2, resultSet.size());
assertTrue(resultSet.stream().allMatch(o -> o.getVolume().equals("vol1") &&
o.getBucket().equals("bucket1")));
- // Test for non-existent bucket
+ // Test for non-existent bucket.
response = utilizationEndpoint.getFileCounts("vol1", "bucket", 0);
resultSet = (List<FileCountBySize>) response.getEntity();
assertEquals(0, resultSet.size());
- // Test for "volume" + "bucket" + "fileSize" query params
+ // Test for "volume" + "bucket" + "fileSize" query params.
response = utilizationEndpoint.getFileCounts("vol1", "bucket1", 131072);
resultSet = (List<FileCountBySize>) response.getEntity();
assertEquals(1, resultSet.size());
FileCountBySize o = resultSet.get(0);
- assertTrue(o.getVolume().equals("vol1") && o.getBucket().equals(
- "bucket1") && o.getFileSize() == 131072);
+ assertTrue(o.getVolume().equals("vol1") && o.getBucket().equals("bucket1")
&&
+ o.getFileSize() == 131072);
- // Test for non-existent fileSize
+ // Test for non-existent fileSize.
response = utilizationEndpoint.getFileCounts("vol1", "bucket1", 1310725);
resultSet = (List<FileCountBySize>) response.getEntity();
assertEquals(0, resultSet.size());
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
index 2c22541d02..30ca22154b 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
@@ -22,6 +22,7 @@
import static
org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.UPDATE;
import static
org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.AdditionalAnswers.returnsElementsOf;
import static org.mockito.BDDMockito.given;
@@ -55,8 +56,10 @@
public class TestFileSizeCountTask extends AbstractReconSqlDBTest {
private FileCountBySizeDao fileCountBySizeDao;
- private FileSizeCountTask fileSizeCountTask;
+ private FileSizeCountTaskOBS fileSizeCountTaskOBS;
+ private FileSizeCountTaskFSO fileSizeCountTaskFSO;
private DSLContext dslContext;
+ private UtilizationSchemaDefinition utilizationSchemaDefinition;
public TestFileSizeCountTask() {
super();
@@ -65,24 +68,28 @@ public TestFileSizeCountTask() {
@BeforeEach
public void setUp() {
fileCountBySizeDao = getDao(FileCountBySizeDao.class);
- UtilizationSchemaDefinition utilizationSchemaDefinition =
- getSchemaDefinition(UtilizationSchemaDefinition.class);
- fileSizeCountTask =
- new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition);
+ utilizationSchemaDefinition =
getSchemaDefinition(UtilizationSchemaDefinition.class);
+ // Create separate task instances.
+ fileSizeCountTaskOBS = new FileSizeCountTaskOBS(fileCountBySizeDao,
utilizationSchemaDefinition);
+ fileSizeCountTaskFSO = new FileSizeCountTaskFSO(fileCountBySizeDao,
utilizationSchemaDefinition);
dslContext = utilizationSchemaDefinition.getDSLContext();
- // Truncate table before running each test
+ // Truncate table before each test.
dslContext.truncate(FILE_COUNT_BY_SIZE);
}
@Test
public void testReprocess() throws IOException {
+ // Create three sample OmKeyInfo objects.
OmKeyInfo[] omKeyInfos = new OmKeyInfo[3];
String[] keyNames = {"key1", "key2", "key3"};
String[] volumeNames = {"vol1", "vol1", "vol1"};
String[] bucketNames = {"bucket1", "bucket1", "bucket1"};
+ // Use sizes so that each falls into a distinct bin:
+ // - 1000L falls into first bin (upper bound 1024L)
+ // - 100000L falls into second bin (upper bound 131072L)
+ // - 4PB (i.e. 1125899906842624L * 4) falls into the highest bin (upper
bound Long.MAX_VALUE)
Long[] dataSizes = {1000L, 100000L, 1125899906842624L * 4};
- // Loop to initialize each instance of OmKeyInfo
for (int i = 0; i < 3; i++) {
omKeyInfos[i] = mock(OmKeyInfo.class);
given(omKeyInfos[i].getKeyName()).willReturn(keyNames[i]);
@@ -91,88 +98,82 @@ public void testReprocess() throws IOException {
given(omKeyInfos[i].getDataSize()).willReturn(dataSizes[i]);
}
- // Create two mock instances of TypedTable, one for FILE_SYSTEM_OPTIMIZED
- // layout and one for LEGACY layout
+ // Prepare the OMMetadataManager mock.
OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
- TypedTable<String, OmKeyInfo> keyTableLegacy = mock(TypedTable.class);
- TypedTable<String, OmKeyInfo> keyTableFso = mock(TypedTable.class);
- // Set return values for getKeyTable() for FILE_SYSTEM_OPTIMIZED
- // and LEGACY layout
- when(omMetadataManager.getKeyTable(eq(BucketLayout.LEGACY)))
- .thenReturn(keyTableLegacy);
+ // Configure the OBS (OBJECT_STORE) endpoint.
+ TypedTable<String, OmKeyInfo> keyTableOBS = mock(TypedTable.class);
+ // Note: Even though legacy and OBS share the same underlying table, we
simulate OBS here.
+ when(omMetadataManager.getKeyTable(eq(BucketLayout.OBJECT_STORE)))
+ .thenReturn(keyTableOBS);
+ TypedTable.TypedTableIterator mockIterOBS =
mock(TypedTable.TypedTableIterator.class);
+ when(keyTableOBS.iterator()).thenReturn(mockIterOBS);
+ // Simulate three keys then end.
+ when(mockIterOBS.hasNext()).thenReturn(true, true, true, false);
+ TypedTable.TypedKeyValue mockKeyValueOBS =
mock(TypedTable.TypedKeyValue.class);
+ when(mockIterOBS.next()).thenReturn(mockKeyValueOBS);
+ when(mockKeyValueOBS.getValue()).thenReturn(omKeyInfos[0], omKeyInfos[1],
omKeyInfos[2]);
+
+ // Configure the FSO (FILE_SYSTEM_OPTIMIZED) endpoint.
+ TypedTable<String, OmKeyInfo> keyTableFSO = mock(TypedTable.class);
when(omMetadataManager.getKeyTable(eq(BucketLayout.FILE_SYSTEM_OPTIMIZED)))
- .thenReturn(keyTableFso);
-
- // Create two mock instances of TypedTableIterator, one for each
- // instance of TypedTable
- TypedTable.TypedTableIterator mockKeyIterLegacy =
- mock(TypedTable.TypedTableIterator.class);
- when(keyTableLegacy.iterator()).thenReturn(mockKeyIterLegacy);
- // Set return values for hasNext() and next() of the mock instance of
- // TypedTableIterator for keyTableLegacy
- when(mockKeyIterLegacy.hasNext()).thenReturn(true, true, true, false);
- TypedTable.TypedKeyValue mockKeyValueLegacy =
- mock(TypedTable.TypedKeyValue.class);
- when(mockKeyIterLegacy.next()).thenReturn(mockKeyValueLegacy);
- when(mockKeyValueLegacy.getValue()).thenReturn(omKeyInfos[0],
omKeyInfos[1],
- omKeyInfos[2]);
-
-
- // Same as above, but for keyTableFso
- TypedTable.TypedTableIterator mockKeyIterFso =
- mock(TypedTable.TypedTableIterator.class);
- when(keyTableFso.iterator()).thenReturn(mockKeyIterFso);
- when(mockKeyIterFso.hasNext()).thenReturn(true, true, true, false);
- TypedTable.TypedKeyValue mockKeyValueFso =
- mock(TypedTable.TypedKeyValue.class);
- when(mockKeyIterFso.next()).thenReturn(mockKeyValueFso);
- when(mockKeyValueFso.getValue()).thenReturn(omKeyInfos[0], omKeyInfos[1],
- omKeyInfos[2]);
-
- // Reprocess could be called from table having existing entries. Adding
- // an entry to simulate that.
- fileCountBySizeDao.insert(
- new FileCountBySize("vol1", "bucket1", 1024L, 10L));
-
- Pair<String, Boolean> result =
- fileSizeCountTask.reprocess(omMetadataManager);
-
- // Verify that the result of reprocess is true
- assertTrue(result.getRight());
-
- // Verify that the number of entries in fileCountBySizeDao is 3
- assertEquals(3, fileCountBySizeDao.count());
-
- // Create a record to find the count of files in a specific volume,
- // bucket and file size
- Record3<String, String, Long> recordToFind = dslContext
- .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+ .thenReturn(keyTableFSO);
+ TypedTable.TypedTableIterator mockIterFSO =
mock(TypedTable.TypedTableIterator.class);
+ when(keyTableFSO.iterator()).thenReturn(mockIterFSO);
+ when(mockIterFSO.hasNext()).thenReturn(true, true, true, false);
+ TypedTable.TypedKeyValue mockKeyValueFSO =
mock(TypedTable.TypedKeyValue.class);
+ when(mockIterFSO.next()).thenReturn(mockKeyValueFSO);
+ when(mockKeyValueFSO.getValue()).thenReturn(omKeyInfos[0], omKeyInfos[1],
omKeyInfos[2]);
+
+ // Simulate a preexisting entry in the DB.
+ // (This record will be removed by the first task via table truncation.)
+ fileCountBySizeDao.insert(new FileCountBySize("vol1", "bucket1", 1024L,
10L));
+
+ // Call reprocess on both tasks.
+ Pair<String, Boolean> resultOBS =
fileSizeCountTaskOBS.reprocess(omMetadataManager);
+ Pair<String, Boolean> resultFSO =
fileSizeCountTaskFSO.reprocess(omMetadataManager);
+
+ // Verify that both tasks reported success.
+ assertTrue(resultOBS.getRight(), "OBS reprocess should return true");
+ assertTrue(resultFSO.getRight(), "FSO reprocess should return true");
+
+ // After processing, there should be 3 rows (one per bin).
+ assertEquals(3, fileCountBySizeDao.count(), "Expected 3 rows in the DB");
+
+ // Now verify the counts in each bin.
+ // Because each task processes the same 3 keys and each key contributes a
count of 1,
+ // the final count per bin should be 2 (1 from OBS + 1 from FSO).
+
+ // Verify bin for key size 1000L -> upper bound 1024L.
+ Record3<String, String, Long> recordToFind = dslContext.newRecord(
+ FILE_COUNT_BY_SIZE.VOLUME,
FILE_COUNT_BY_SIZE.BUCKET,
FILE_COUNT_BY_SIZE.FILE_SIZE)
.value1("vol1")
.value2("bucket1")
.value3(1024L);
- assertEquals(2L,
- fileCountBySizeDao.findById(recordToFind).getCount().longValue());
- // file size upper bound for 100000L is 131072L (next highest power of 2)
+ assertEquals(2L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue(),
+ "Expected bin 1024 to have count 2");
+
+ // Verify bin for key size 100000L -> upper bound 131072L.
recordToFind.value3(131072L);
- assertEquals(2L,
- fileCountBySizeDao.findById(recordToFind).getCount().longValue());
- // file size upper bound for 4PB is Long.MAX_VALUE
+ assertEquals(2L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue(),
+ "Expected bin 131072 to have count 2");
+
+ // Verify bin for key size 4PB -> upper bound Long.MAX_VALUE.
recordToFind.value3(Long.MAX_VALUE);
- assertEquals(2L,
- fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+ assertEquals(2L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue(),
+ "Expected bin Long.MAX_VALUE to have count 2");
}
@Test
public void testProcess() {
- // Write 2 keys.
+ // First batch: Write 2 keys.
OmKeyInfo toBeDeletedKey = mock(OmKeyInfo.class);
given(toBeDeletedKey.getVolumeName()).willReturn("vol1");
given(toBeDeletedKey.getBucketName()).willReturn("bucket1");
given(toBeDeletedKey.getKeyName()).willReturn("deletedKey");
- given(toBeDeletedKey.getDataSize()).willReturn(2000L); // Bin 1
+ given(toBeDeletedKey.getDataSize()).willReturn(2000L); // Falls in bin
with upper bound 2048L
OMDBUpdateEvent event = new OMUpdateEventBuilder()
.setAction(PUT)
.setKey("deletedKey")
@@ -184,7 +185,7 @@ public void testProcess() {
given(toBeUpdatedKey.getVolumeName()).willReturn("vol1");
given(toBeUpdatedKey.getBucketName()).willReturn("bucket1");
given(toBeUpdatedKey.getKeyName()).willReturn("updatedKey");
- given(toBeUpdatedKey.getDataSize()).willReturn(10000L); // Bin 4
+ given(toBeUpdatedKey.getDataSize()).willReturn(10000L); // Falls in bin
with upper bound 16384L
OMDBUpdateEvent event2 = new OMUpdateEventBuilder()
.setAction(PUT)
.setKey("updatedKey")
@@ -194,9 +195,14 @@ public void testProcess() {
OMUpdateEventBatch omUpdateEventBatch =
new OMUpdateEventBatch(Arrays.asList(event, event2), 0L);
- fileSizeCountTask.process(omUpdateEventBatch);
- // Verify 2 keys are in correct bins.
+ // Process the same batch on both endpoints.
+ fileSizeCountTaskOBS.process(omUpdateEventBatch);
+ fileSizeCountTaskFSO.process(omUpdateEventBatch);
+
+ // After processing the first batch:
+ // Since each endpoint processes the same events, the counts are doubled.
+ // Expected: 2 rows (bins 2048 and 16384) with counts 2 each.
assertEquals(2, fileCountBySizeDao.count());
Record3<String, String, Long> recordToFind = dslContext
.newRecord(FILE_COUNT_BY_SIZE.VOLUME,
@@ -212,6 +218,7 @@ public void testProcess() {
assertEquals(1L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+ // Second batch: Process update events.
// Add new key.
OmKeyInfo newKey = mock(OmKeyInfo.class);
given(newKey.getVolumeName()).willReturn("vol1");
@@ -249,7 +256,8 @@ public void testProcess() {
omUpdateEventBatch = new OMUpdateEventBatch(
Arrays.asList(updateEvent, putEvent, deleteEvent), 0L);
- fileSizeCountTask.process(omUpdateEventBatch);
+ fileSizeCountTaskOBS.process(omUpdateEventBatch);
+ fileSizeCountTaskFSO.process(omUpdateEventBatch);
assertEquals(4, fileCountBySizeDao.count());
recordToFind.value3(1024L);
@@ -268,8 +276,7 @@ public void testProcess() {
@Test
public void testReprocessAtScale() throws IOException {
- // generate mocks for 2 volumes, 500 buckets each volume
- // and 42 keys in each bucket.
+ // generate mocks for 2 volumes, 500 buckets each volume, and 42 keys in
each bucket.
List<OmKeyInfo> omKeyInfoList = new ArrayList<>();
List<Boolean> hasNextAnswer = new ArrayList<>();
for (int volIndex = 1; volIndex <= 2; volIndex++) {
@@ -279,8 +286,8 @@ public void testReprocessAtScale() throws IOException {
given(omKeyInfo.getKeyName()).willReturn("key" + keyIndex);
given(omKeyInfo.getVolumeName()).willReturn("vol" + volIndex);
given(omKeyInfo.getBucketName()).willReturn("bucket" + bktIndex);
- // Place keys in each bin
- long fileSize = (long)Math.pow(2, keyIndex + 9) - 1L;
+ // Each key's fileSize = 2^(keyIndex+9) - 1, so that it falls into
its respective bin.
+ long fileSize = (long) Math.pow(2, keyIndex + 9) - 1L;
given(omKeyInfo.getDataSize()).willReturn(fileSize);
omKeyInfoList.add(omKeyInfo);
hasNextAnswer.add(true);
@@ -290,46 +297,43 @@ public void testReprocessAtScale() throws IOException {
hasNextAnswer.add(false);
OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
+ // Create two mock key tables: one for OBS (using LEGACY in this test) and
one for FSO.
TypedTable<String, OmKeyInfo> keyTableLegacy = mock(TypedTable.class);
TypedTable<String, OmKeyInfo> keyTableFso = mock(TypedTable.class);
- TypedTable.TypedTableIterator mockKeyIterLegacy = mock(TypedTable
- .TypedTableIterator.class);
- TypedTable.TypedTableIterator mockKeyIterFso = mock(TypedTable
- .TypedTableIterator.class);
- TypedTable.TypedKeyValue mockKeyValueLegacy = mock(
- TypedTable.TypedKeyValue.class);
- TypedTable.TypedKeyValue mockKeyValueFso = mock(
- TypedTable.TypedKeyValue.class);
+ TypedTable.TypedTableIterator mockKeyIterLegacy =
mock(TypedTable.TypedTableIterator.class);
+ TypedTable.TypedTableIterator mockKeyIterFso =
mock(TypedTable.TypedTableIterator.class);
+ TypedTable.TypedKeyValue mockKeyValueLegacy =
mock(TypedTable.TypedKeyValue.class);
+ TypedTable.TypedKeyValue mockKeyValueFso =
mock(TypedTable.TypedKeyValue.class);
when(keyTableLegacy.iterator()).thenReturn(mockKeyIterLegacy);
when(keyTableFso.iterator()).thenReturn(mockKeyIterFso);
- when(omMetadataManager.getKeyTable(BucketLayout.LEGACY))
- .thenReturn(keyTableLegacy);
- when(omMetadataManager.getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED))
- .thenReturn(keyTableFso);
+ // In this test, assume OBS task uses BucketLayout.LEGACY and FSO uses
FILE_SYSTEM_OPTIMIZED.
+
when(omMetadataManager.getKeyTable(BucketLayout.OBJECT_STORE)).thenReturn(keyTableLegacy);
+
when(omMetadataManager.getKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED)).thenReturn(keyTableFso);
- when(mockKeyIterLegacy.hasNext())
- .thenAnswer(returnsElementsOf(hasNextAnswer));
- when(mockKeyIterFso.hasNext())
- .thenAnswer(returnsElementsOf(hasNextAnswer));
+
when(mockKeyIterLegacy.hasNext()).thenAnswer(returnsElementsOf(hasNextAnswer));
+
when(mockKeyIterFso.hasNext()).thenAnswer(returnsElementsOf(hasNextAnswer));
when(mockKeyIterLegacy.next()).thenReturn(mockKeyValueLegacy);
when(mockKeyIterFso.next()).thenReturn(mockKeyValueFso);
- when(mockKeyValueLegacy.getValue())
- .thenAnswer(returnsElementsOf(omKeyInfoList));
- when(mockKeyValueFso.getValue())
- .thenAnswer(returnsElementsOf(omKeyInfoList));
+
when(mockKeyValueLegacy.getValue()).thenAnswer(returnsElementsOf(omKeyInfoList));
+
when(mockKeyValueFso.getValue()).thenAnswer(returnsElementsOf(omKeyInfoList));
- Pair<String, Boolean> result =
- fileSizeCountTask.reprocess(omMetadataManager);
- assertTrue(result.getRight());
+ // Call reprocess on both endpoints.
+ Pair<String, Boolean> resultOBS =
fileSizeCountTaskOBS.reprocess(omMetadataManager);
+ Pair<String, Boolean> resultFSO =
fileSizeCountTaskFSO.reprocess(omMetadataManager);
+ assertTrue(resultOBS.getRight());
+ assertTrue(resultFSO.getRight());
// 2 volumes * 500 buckets * 42 bins = 42000 rows
assertEquals(42000, fileCountBySizeDao.count());
- Record3<String, String, Long> recordToFind = dslContext
- .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+
+ // Verify counts for a few representative bins.
+ // For volume "vol1", bucket "bucket1", the first bin (upper bound 1024L)
should have a count of 2.
+ Record3<String, String, Long> recordToFind = dslContext.newRecord(
+ FILE_COUNT_BY_SIZE.VOLUME,
FILE_COUNT_BY_SIZE.BUCKET,
FILE_COUNT_BY_SIZE.FILE_SIZE)
.value1("vol1")
@@ -337,11 +341,13 @@ public void testReprocessAtScale() throws IOException {
.value3(1024L);
assertEquals(2L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
- // file size upper bound for 100000L is 131072L (next highest power of 2)
- recordToFind.value1("vol1");
+
+ // For volume "vol1", bucket "bucket1", the bin with upper bound 131072L
should have a count of 2.
recordToFind.value3(131072L);
assertEquals(2L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+
+ // For volume "vol1", bucket "bucket500", the highest bin (upper bound
Long.MAX_VALUE) should have a count of 2.
recordToFind.value2("bucket500");
recordToFind.value3(Long.MAX_VALUE);
assertEquals(2L,
@@ -385,9 +391,10 @@ public void testProcessAtScale() {
}
}
- OMUpdateEventBatch omUpdateEventBatch =
- new OMUpdateEventBatch(omDbEventList, 0L);
- fileSizeCountTask.process(omUpdateEventBatch);
+ OMUpdateEventBatch omUpdateEventBatch = new
OMUpdateEventBatch(omDbEventList, 0L);
+ // Process the same batch on both endpoints.
+ fileSizeCountTaskOBS.process(omUpdateEventBatch);
+ fileSizeCountTaskFSO.process(omUpdateEventBatch);
// Verify 2 keys are in correct bins.
assertEquals(10000, fileCountBySizeDao.count());
@@ -463,7 +470,8 @@ public void testProcessAtScale() {
}
omUpdateEventBatch = new OMUpdateEventBatch(omDbEventList, 0L);
- fileSizeCountTask.process(omUpdateEventBatch);
+ fileSizeCountTaskOBS.process(omUpdateEventBatch);
+ fileSizeCountTaskFSO.process(omUpdateEventBatch);
assertEquals(10000, fileCountBySizeDao.count());
recordToFind = dslContext
@@ -488,4 +496,34 @@ public void testProcessAtScale() {
.getCount().longValue());
}
+
+ @Test
+ public void testTruncateTableExceptionPropagation() {
+ // Mock DSLContext and FileCountBySizeDao
+ DSLContext mockDslContext = mock(DSLContext.class);
+ FileCountBySizeDao mockDao = mock(FileCountBySizeDao.class);
+
+ // Mock schema definition and ensure it returns our mocked DSLContext
+ UtilizationSchemaDefinition mockSchema =
mock(UtilizationSchemaDefinition.class);
+ when(mockSchema.getDSLContext()).thenReturn(mockDslContext);
+
+ // Mock delete operation to throw an exception
+ when(mockDslContext.delete(FILE_COUNT_BY_SIZE))
+ .thenThrow(new RuntimeException("Simulated DB failure"));
+
+ // Create instances of FileSizeCountTaskOBS and FileSizeCountTaskFSO using
mocks
+ fileSizeCountTaskOBS = new FileSizeCountTaskOBS(mockDao, mockSchema);
+ fileSizeCountTaskFSO = new FileSizeCountTaskFSO(mockDao, mockSchema);
+
+ // Mock OMMetadataManager
+ OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
+
+ // Verify that an exception is thrown from reprocess() for both tasks.
+ assertThrows(RuntimeException.class, () ->
fileSizeCountTaskOBS.reprocess(omMetadataManager),
+ "Expected reprocess to propagate exception but it didn't.");
+
+ assertThrows(RuntimeException.class, () ->
fileSizeCountTaskFSO.reprocess(omMetadataManager),
+ "Expected reprocess to propagate exception but it didn't.");
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]