This is an automated email from the ASF dual-hosted git repository.
arafat2198 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 393211a6e8 HDDS-12062. Recon - Error handling in NSSummaryTask to
avoid data inconsistencies. (#7723)
393211a6e8 is described below
commit 393211a6e8b4a591c729f3dd2a23320baa60f132
Author: Devesh Kumar Singh <[email protected]>
AuthorDate: Mon Feb 24 18:10:24 2025 +0530
HDDS-12062. Recon - Error handling in NSSummaryTask to avoid data
inconsistencies. (#7723)
---
.../spi/impl/OzoneManagerServiceProviderImpl.java | 2 +
.../ozone/recon/tasks/ContainerKeyMapperTask.java | 21 +-
.../ozone/recon/tasks/FileSizeCountTask.java | 333 ---------------------
.../ozone/recon/tasks/FileSizeCountTaskFSO.java | 6 +-
.../ozone/recon/tasks/FileSizeCountTaskHelper.java | 37 ++-
.../ozone/recon/tasks/FileSizeCountTaskOBS.java | 6 +-
.../hadoop/ozone/recon/tasks/NSSummaryTask.java | 102 +++++--
.../recon/tasks/NSSummaryTaskDbEventHandler.java | 20 +-
.../ozone/recon/tasks/NSSummaryTaskWithFSO.java | 45 ++-
.../ozone/recon/tasks/NSSummaryTaskWithLegacy.java | 41 ++-
.../ozone/recon/tasks/NSSummaryTaskWithOBS.java | 44 ++-
.../ozone/recon/tasks/OmTableInsightTask.java | 20 +-
.../hadoop/ozone/recon/tasks/ReconOmTask.java | 110 ++++++-
.../ozone/recon/tasks/ReconTaskControllerImpl.java | 68 +++--
.../ozone/recon/api/TestContainerEndpoint.java | 4 +-
.../hadoop/ozone/recon/api/TestEndpoints.java | 14 +-
.../recon/api/TestNSSummaryDiskUsageOrdering.java | 2 +-
.../recon/api/TestNSSummaryEndpointWithFSO.java | 2 +-
.../recon/api/TestNSSummaryEndpointWithLegacy.java | 2 +-
.../api/TestNSSummaryEndpointWithOBSAndLegacy.java | 4 +-
.../ozone/recon/api/TestOmDBInsightEndPoint.java | 6 +-
.../recon/api/TestOpenKeysSearchEndpoint.java | 2 +-
.../hadoop/ozone/recon/tasks/DummyReconDBTask.java | 16 +-
.../recon/tasks/TestContainerKeyMapperTask.java | 6 +-
.../ozone/recon/tasks/TestFileSizeCountTask.java | 34 +--
.../ozone/recon/tasks/TestNSSummaryTask.java | 3 +-
.../recon/tasks/TestNSSummaryTaskWithFSO.java | 108 ++++++-
.../recon/tasks/TestNSSummaryTaskWithLegacy.java | 13 +-
.../TestNSSummaryTaskWithLegacyOBSLayout.java | 10 +-
.../recon/tasks/TestNSSummaryTaskWithOBS.java | 16 +-
.../ozone/recon/tasks/TestOmTableInsightTask.java | 42 +--
.../recon/tasks/TestReconTaskControllerImpl.java | 14 +-
32 files changed, 574 insertions(+), 579 deletions(-)
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
index abe85e4763..a096268337 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
@@ -501,7 +501,9 @@ ImmutablePair<Boolean, Long>
innerGetAndApplyDeltaUpdatesFromOM(long fromSequenc
}
for (byte[] data : dbUpdates.getData()) {
try (ManagedWriteBatch writeBatch = new ManagedWriteBatch(data)) {
+ // Events gets populated in events list in OMDBUpdatesHandler with
call back for put/delete/update
writeBatch.iterate(omdbUpdatesHandler);
+ // Commit the OM DB transactions in recon rocks DB and sync here.
try (RDBBatchOperation rdbBatchOperation =
new RDBBatchOperation(writeBatch)) {
try (ManagedWriteOptions wOpts = new ManagedWriteOptions()) {
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
index 6e6390d324..e42e021b9e 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
@@ -34,8 +34,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
import org.apache.hadoop.hdds.utils.db.Table;
@@ -82,7 +80,7 @@ public ContainerKeyMapperTask(ReconContainerMetadataManager
* (container, key) -> count to Recon Container DB.
*/
@Override
- public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+ public TaskResult reprocess(OMMetadataManager omMetadataManager) {
long omKeyCount = 0;
// In-memory maps for fast look up and batch write
@@ -118,7 +116,7 @@ public Pair<String, Boolean> reprocess(OMMetadataManager
omMetadataManager) {
containerKeyCountMap);
if (!checkAndCallFlushToDB(containerKeyMap)) {
LOG.error("Unable to flush containerKey information to the DB");
- return new ImmutablePair<>(getTaskName(), false);
+ return buildTaskResult(false);
}
omKeyCount++;
}
@@ -131,7 +129,7 @@ public Pair<String, Boolean> reprocess(OMMetadataManager
omMetadataManager) {
containerKeyCountMap)) {
LOG.error("Unable to flush Container Key Count and " +
"remaining Container Key information to the DB");
- return new ImmutablePair<>(getTaskName(), false);
+ return buildTaskResult(false);
}
LOG.debug("Completed 'reprocess' of ContainerKeyMapperTask.");
@@ -142,9 +140,9 @@ public Pair<String, Boolean> reprocess(OMMetadataManager
omMetadataManager) {
} catch (IOException ioEx) {
LOG.error("Unable to populate Container Key data in Recon DB. ",
ioEx);
- return new ImmutablePair<>(getTaskName(), false);
+ return buildTaskResult(false);
}
- return new ImmutablePair<>(getTaskName(), true);
+ return buildTaskResult(true);
}
private boolean flushAndCommitContainerKeyInfoToDB(
@@ -189,7 +187,8 @@ public Collection<String> getTaskTables() {
}
@Override
- public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+ public TaskResult process(OMUpdateEventBatch events,
+ Map<String, Integer> subTaskSeekPosMap) {
Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
int eventCount = 0;
final Collection<String> taskTables = getTaskTables();
@@ -246,18 +245,18 @@ public Pair<String, Boolean> process(OMUpdateEventBatch
events) {
} catch (IOException e) {
LOG.error("Unexpected exception while updating key data : {} ",
updatedKey, e);
- return new ImmutablePair<>(getTaskName(), false);
+ return buildTaskResult(false);
}
}
try {
writeToTheDB(containerKeyMap, containerKeyCountMap, deletedKeyCountList);
} catch (IOException e) {
LOG.error("Unable to write Container Key Prefix data in Recon DB.", e);
- return new ImmutablePair<>(getTaskName(), false);
+ return buildTaskResult(false);
}
LOG.debug("{} successfully processed {} OM DB update event(s) in {}
milliseconds.",
getTaskName(), eventCount, (System.currentTimeMillis() - startTime));
- return new ImmutablePair<>(getTaskName(), true);
+ return buildTaskResult(true);
}
private void writeToTheDB(Map<ContainerKeyPrefix, Integer> containerKeyMap,
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
deleted file mode 100644
index 67da0d6c78..0000000000
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.recon.tasks;
-
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
-import static
org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
-
-import com.google.inject.Inject;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.helpers.BucketLayout;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.recon.ReconUtils;
-import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
-import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
-import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
-import org.jooq.DSLContext;
-import org.jooq.Record3;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class to iterate over the OM DB and store the counts of existing/new
- * files binned into ranges (1KB, 2Kb..,4MB,.., 1TB,..1PB) to the Recon
- * fileSize DB.
- */
-public class FileSizeCountTask implements ReconOmTask {
- private static final Logger LOG =
- LoggerFactory.getLogger(FileSizeCountTask.class);
-
- private FileCountBySizeDao fileCountBySizeDao;
- private DSLContext dslContext;
-
- @Inject
- public FileSizeCountTask(FileCountBySizeDao fileCountBySizeDao,
- UtilizationSchemaDefinition
- utilizationSchemaDefinition) {
- this.fileCountBySizeDao = fileCountBySizeDao;
- this.dslContext = utilizationSchemaDefinition.getDSLContext();
- }
-
- /**
- * Read the Keys from OM snapshot DB and calculate the upper bound of
- * File Size it belongs to.
- *
- * @param omMetadataManager OM Metadata instance.
- * @return Pair
- */
- @Override
- public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
- // Map to store the count of files based on file size
- Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
-
- // Delete all records from FILE_COUNT_BY_SIZE table
- int execute = dslContext.delete(FILE_COUNT_BY_SIZE).execute();
- LOG.debug("Deleted {} records from {}", execute, FILE_COUNT_BY_SIZE);
-
- // Call reprocessBucket method for FILE_SYSTEM_OPTIMIZED bucket layout
- boolean statusFSO =
- reprocessBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED,
- omMetadataManager,
- fileSizeCountMap);
- // Call reprocessBucket method for LEGACY bucket layout
- boolean statusOBS =
- reprocessBucketLayout(BucketLayout.LEGACY, omMetadataManager,
- fileSizeCountMap);
- if (!statusFSO && !statusOBS) {
- return new ImmutablePair<>(getTaskName(), false);
- }
- writeCountsToDB(fileSizeCountMap);
- LOG.debug("Completed a 'reprocess' run of FileSizeCountTask.");
- return new ImmutablePair<>(getTaskName(), true);
- }
-
- private boolean reprocessBucketLayout(BucketLayout bucketLayout,
- OMMetadataManager omMetadataManager,
- Map<FileSizeCountKey, Long> fileSizeCountMap) {
- Table<String, OmKeyInfo> omKeyInfoTable =
- omMetadataManager.getKeyTable(bucketLayout);
- try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
- keyIter = omKeyInfoTable.iterator()) {
- while (keyIter.hasNext()) {
- Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
- handlePutKeyEvent(kv.getValue(), fileSizeCountMap);
- // The time complexity of .size() method is constant time, O(1)
- if (fileSizeCountMap.size() >= 100000) {
- writeCountsToDB(fileSizeCountMap);
- fileSizeCountMap.clear();
- }
- }
- } catch (IOException ioEx) {
- LOG.error("Unable to populate File Size Count for " + bucketLayout +
- " in Recon DB. ", ioEx);
- return false;
- }
- return true;
- }
-
- @Override
- public String getTaskName() {
- return "FileSizeCountTask";
- }
-
- public Collection<String> getTaskTables() {
- List<String> taskTables = new ArrayList<>();
- taskTables.add(KEY_TABLE);
- taskTables.add(FILE_TABLE);
- return taskTables;
- }
-
- /**
- * Read the Keys from update events and update the count of files
- * pertaining to a certain upper bound.
- *
- * @param events Update events - PUT/DELETE.
- * @return Pair
- */
- @Override
- public Pair<String, Boolean> process(OMUpdateEventBatch events) {
- Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
- Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
- final Collection<String> taskTables = getTaskTables();
-
- long startTime = System.currentTimeMillis();
- while (eventIterator.hasNext()) {
- OMDBUpdateEvent<String, Object> omdbUpdateEvent = eventIterator.next();
- // Filter event inside process method to avoid duping
- if (!taskTables.contains(omdbUpdateEvent.getTable())) {
- continue;
- }
- String updatedKey = omdbUpdateEvent.getKey();
- Object value = omdbUpdateEvent.getValue();
- Object oldValue = omdbUpdateEvent.getOldValue();
-
- if (value instanceof OmKeyInfo) {
- OmKeyInfo omKeyInfo = (OmKeyInfo) value;
- OmKeyInfo omKeyInfoOld = (OmKeyInfo) oldValue;
-
- try {
- switch (omdbUpdateEvent.getAction()) {
- case PUT:
- handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
- break;
-
- case DELETE:
- handleDeleteKeyEvent(updatedKey, omKeyInfo, fileSizeCountMap);
- break;
-
- case UPDATE:
- if (omKeyInfoOld != null) {
- handleDeleteKeyEvent(updatedKey, omKeyInfoOld, fileSizeCountMap);
- handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
- } else {
- LOG.warn("Update event does not have the old keyInfo for {}.",
- updatedKey);
- }
- break;
-
- default:
- LOG.trace("Skipping DB update event : {}",
- omdbUpdateEvent.getAction());
- }
- } catch (Exception e) {
- LOG.error("Unexpected exception while processing key {}.",
- updatedKey, e);
- return new ImmutablePair<>(getTaskName(), false);
- }
- } else {
- LOG.warn("Unexpected value type {} for key {}. Skipping processing.",
- value.getClass().getName(), updatedKey);
- }
- }
- writeCountsToDB(fileSizeCountMap);
- LOG.debug("{} successfully processed in {} milliseconds",
- getTaskName(), (System.currentTimeMillis() - startTime));
- return new ImmutablePair<>(getTaskName(), true);
- }
-
- /**
- * Populate DB with the counts of file sizes calculated
- * using the dao.
- *
- */
- private void writeCountsToDB(Map<FileSizeCountKey, Long> fileSizeCountMap) {
-
- List<FileCountBySize> insertToDb = new ArrayList<>();
- List<FileCountBySize> updateInDb = new ArrayList<>();
- boolean isDbTruncated = isFileCountBySizeTableEmpty(); // Check if table
is empty
-
- fileSizeCountMap.keySet().forEach((FileSizeCountKey key) -> {
- FileCountBySize newRecord = new FileCountBySize();
- newRecord.setVolume(key.volume);
- newRecord.setBucket(key.bucket);
- newRecord.setFileSize(key.fileSizeUpperBound);
- newRecord.setCount(fileSizeCountMap.get(key));
- if (!isDbTruncated) {
- // Get the current count from database and update
- Record3<String, String, Long> recordToFind =
- dslContext.newRecord(
- FILE_COUNT_BY_SIZE.VOLUME,
- FILE_COUNT_BY_SIZE.BUCKET,
- FILE_COUNT_BY_SIZE.FILE_SIZE)
- .value1(key.volume)
- .value2(key.bucket)
- .value3(key.fileSizeUpperBound);
- FileCountBySize fileCountRecord =
- fileCountBySizeDao.findById(recordToFind);
- if (fileCountRecord == null && newRecord.getCount() > 0L) {
- // insert new row only for non-zero counts.
- insertToDb.add(newRecord);
- } else if (fileCountRecord != null) {
- newRecord.setCount(fileCountRecord.getCount() +
- fileSizeCountMap.get(key));
- updateInDb.add(newRecord);
- }
- } else if (newRecord.getCount() > 0) {
- // insert new row only for non-zero counts.
- insertToDb.add(newRecord);
- }
- });
- fileCountBySizeDao.insert(insertToDb);
- fileCountBySizeDao.update(updateInDb);
- }
-
- private FileSizeCountKey getFileSizeCountKey(OmKeyInfo omKeyInfo) {
- return new FileSizeCountKey(omKeyInfo.getVolumeName(),
- omKeyInfo.getBucketName(),
- ReconUtils.getFileSizeUpperBound(omKeyInfo.getDataSize()));
- }
-
- /**
- * Calculate and update the count of files being tracked by
- * fileSizeCountMap.
- * Used by reprocess() and process().
- *
- * @param omKeyInfo OmKey being updated for count
- */
- private void handlePutKeyEvent(OmKeyInfo omKeyInfo,
- Map<FileSizeCountKey, Long> fileSizeCountMap)
{
- FileSizeCountKey key = getFileSizeCountKey(omKeyInfo);
- Long count = fileSizeCountMap.containsKey(key) ?
- fileSizeCountMap.get(key) + 1L : 1L;
- fileSizeCountMap.put(key, count);
- }
-
- private BucketLayout getBucketLayout() {
- return BucketLayout.DEFAULT;
- }
-
- /**
- * Calculate and update the count of files being tracked by
- * fileSizeCountMap.
- * Used by reprocess() and process().
- *
- * @param omKeyInfo OmKey being updated for count
- */
- private void handleDeleteKeyEvent(String key, OmKeyInfo omKeyInfo,
- Map<FileSizeCountKey, Long>
- fileSizeCountMap) {
- if (omKeyInfo == null) {
- LOG.warn("Deleting a key not found while handling DELETE key event. Key"
+
- " not found in Recon OM DB : {}", key);
- } else {
- FileSizeCountKey countKey = getFileSizeCountKey(omKeyInfo);
- Long count = fileSizeCountMap.containsKey(countKey) ?
- fileSizeCountMap.get(countKey) - 1L : -1L;
- fileSizeCountMap.put(countKey, count);
- }
- }
-
- /**
- * Checks if the FILE_COUNT_BY_SIZE table is empty.
- *
- * @return true if the table is empty, false otherwise.
- */
- private boolean isFileCountBySizeTableEmpty() {
- return dslContext.fetchCount(FILE_COUNT_BY_SIZE) == 0;
- }
-
- private static class FileSizeCountKey {
- private String volume;
- private String bucket;
- private Long fileSizeUpperBound;
-
- FileSizeCountKey(String volume, String bucket,
- Long fileSizeUpperBound) {
- this.volume = volume;
- this.bucket = bucket;
- this.fileSizeUpperBound = fileSizeUpperBound;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof FileSizeCountKey) {
- FileSizeCountKey s = (FileSizeCountKey) obj;
- return volume.equals(s.volume) && bucket.equals(s.bucket) &&
- fileSizeUpperBound.equals(s.fileSizeUpperBound);
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return (volume + bucket + fileSizeUpperBound).hashCode();
- }
- }
-}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java
index f40a859590..a411444780 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.ozone.recon.tasks;
import com.google.inject.Inject;
-import org.apache.commons.lang3.tuple.Pair;
+import java.util.Map;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -42,7 +42,7 @@ public FileSizeCountTaskFSO(FileCountBySizeDao
fileCountBySizeDao,
}
@Override
- public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+ public TaskResult reprocess(OMMetadataManager omMetadataManager) {
return FileSizeCountTaskHelper.reprocess(
omMetadataManager,
dslContext,
@@ -53,7 +53,7 @@ public Pair<String, Boolean> reprocess(OMMetadataManager
omMetadataManager) {
}
@Override
- public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+ public TaskResult process(OMUpdateEventBatch events, Map<String, Integer>
subTaskSeekPosMap) {
// This task listens only on the FILE_TABLE.
return FileSizeCountTaskHelper.processEvents(
events,
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
index 489449d6a9..406ad2e953 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
@@ -25,8 +25,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -85,11 +83,11 @@ public static void truncateTableIfNeeded(DSLContext
dslContext) {
* @param taskName The name of the task for logging.
* @return A Pair of task name and boolean indicating success.
*/
- public static Pair<String, Boolean> reprocess(OMMetadataManager
omMetadataManager,
- DSLContext dslContext,
- FileCountBySizeDao
fileCountBySizeDao,
- BucketLayout bucketLayout,
- String taskName) {
+ public static ReconOmTask.TaskResult reprocess(OMMetadataManager
omMetadataManager,
+ DSLContext dslContext,
+ FileCountBySizeDao
fileCountBySizeDao,
+ BucketLayout bucketLayout,
+ String taskName) {
LOG.info("Starting Reprocess for {}", taskName);
Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
long startTime = System.currentTimeMillis();
@@ -97,12 +95,12 @@ public static Pair<String, Boolean>
reprocess(OMMetadataManager omMetadataManage
boolean status = reprocessBucketLayout(
bucketLayout, omMetadataManager, fileSizeCountMap, dslContext,
fileCountBySizeDao, taskName);
if (!status) {
- return new ImmutablePair<>(taskName, false);
+ return buildTaskResult(taskName, false);
}
writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao);
long endTime = System.currentTimeMillis();
LOG.info("{} completed Reprocess in {} ms.", taskName, (endTime -
startTime));
- return new ImmutablePair<>(taskName, true);
+ return buildTaskResult(taskName, true);
}
/**
@@ -155,11 +153,11 @@ public static boolean reprocessBucketLayout(BucketLayout
bucketLayout,
* @param taskName The name of the task for logging.
* @return A Pair of task name and boolean indicating success.
*/
- public static Pair<String, Boolean> processEvents(OMUpdateEventBatch events,
- String tableName,
- DSLContext dslContext,
- FileCountBySizeDao
fileCountBySizeDao,
- String taskName) {
+ public static ReconOmTask.TaskResult processEvents(OMUpdateEventBatch events,
+ String tableName,
+ DSLContext dslContext,
+ FileCountBySizeDao
fileCountBySizeDao,
+ String taskName) {
Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
long startTime = System.currentTimeMillis();
@@ -195,7 +193,7 @@ public static Pair<String, Boolean>
processEvents(OMUpdateEventBatch events,
}
} catch (Exception e) {
LOG.error("Unexpected exception while processing key {}.",
updatedKey, e);
- return new ImmutablePair<>(taskName, false);
+ return buildTaskResult(taskName, false);
}
} else {
LOG.warn("Unexpected value type {} for key {}. Skipping processing.",
@@ -205,7 +203,7 @@ public static Pair<String, Boolean>
processEvents(OMUpdateEventBatch events,
writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao);
LOG.debug("{} successfully processed in {} milliseconds", taskName,
(System.currentTimeMillis() - startTime));
- return new ImmutablePair<>(taskName, true);
+ return buildTaskResult(taskName, true);
}
/**
@@ -328,4 +326,11 @@ public int hashCode() {
return (volume + bucket + fileSizeUpperBound).hashCode();
}
}
+
+ public static ReconOmTask.TaskResult buildTaskResult(String taskName,
boolean success) {
+ return new ReconOmTask.TaskResult.Builder()
+ .setTaskName(taskName)
+ .setTaskSuccess(success)
+ .build();
+ }
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java
index acaab763ac..05cd0e1669 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.ozone.recon.tasks;
import com.google.inject.Inject;
-import org.apache.commons.lang3.tuple.Pair;
+import java.util.Map;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -42,7 +42,7 @@ public FileSizeCountTaskOBS(FileCountBySizeDao
fileCountBySizeDao,
}
@Override
- public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+ public TaskResult reprocess(OMMetadataManager omMetadataManager) {
return FileSizeCountTaskHelper.reprocess(
omMetadataManager,
dslContext,
@@ -53,7 +53,7 @@ public Pair<String, Boolean> reprocess(OMMetadataManager
omMetadataManager) {
}
@Override
- public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+ public TaskResult process(OMUpdateEventBatch events, Map<String, Integer>
subTaskSeekPosMap) {
// This task listens only on the KEY_TABLE.
return FileSizeCountTaskHelper.processEvents(
events,
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java
index 2150800350..aa2a94caf1 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java
@@ -17,11 +17,16 @@
package org.apache.hadoop.ozone.recon.tasks;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT;
+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -30,7 +35,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
-import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -39,6 +43,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Task to query data from OMDB and write into Recon RocksDB.
* Reprocess() will take a snapshots on OMDB, and iterate the keyTable,
@@ -69,7 +74,6 @@ public class NSSummaryTask implements ReconOmTask {
private final NSSummaryTaskWithFSO nsSummaryTaskWithFSO;
private final NSSummaryTaskWithLegacy nsSummaryTaskWithLegacy;
private final NSSummaryTaskWithOBS nsSummaryTaskWithOBS;
- private final OzoneConfiguration ozoneConfiguration;
@Inject
public NSSummaryTask(ReconNamespaceSummaryManager
@@ -80,16 +84,19 @@ public NSSummaryTask(ReconNamespaceSummaryManager
ozoneConfiguration) {
this.reconNamespaceSummaryManager = reconNamespaceSummaryManager;
this.reconOMMetadataManager = reconOMMetadataManager;
- this.ozoneConfiguration = ozoneConfiguration;
+ long nsSummaryFlushToDBMaxThreshold = ozoneConfiguration.getLong(
+ OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD,
+ OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT);
+
this.nsSummaryTaskWithFSO = new NSSummaryTaskWithFSO(
- reconNamespaceSummaryManager,
- reconOMMetadataManager, ozoneConfiguration);
+ reconNamespaceSummaryManager, reconOMMetadataManager,
+ ozoneConfiguration, nsSummaryFlushToDBMaxThreshold);
this.nsSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy(
- reconNamespaceSummaryManager,
- reconOMMetadataManager, ozoneConfiguration);
+ reconNamespaceSummaryManager, reconOMMetadataManager,
+ ozoneConfiguration, nsSummaryFlushToDBMaxThreshold);
this.nsSummaryTaskWithOBS = new NSSummaryTaskWithOBS(
- reconNamespaceSummaryManager,
- reconOMMetadataManager, ozoneConfiguration);
+ reconNamespaceSummaryManager, reconOMMetadataManager,
+ ozoneConfiguration, nsSummaryFlushToDBMaxThreshold);
}
@Override
@@ -97,28 +104,68 @@ public String getTaskName() {
return "NSSummaryTask";
}
+ /**
+ * Bucket Type Enum which mimic subtasks for their data processing.
+ */
+ public enum BucketType {
+ FSO("File System Optimized Bucket"),
+ OBS("Object Store Bucket"),
+ LEGACY("Legacy Bucket");
+
+ private final String description;
+
+ BucketType(String description) {
+ this.description = description;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+ }
+
@Override
- public Pair<String, Boolean> process(OMUpdateEventBatch events) {
- long startTime = System.currentTimeMillis();
- boolean success = nsSummaryTaskWithFSO.processWithFSO(events);
- if (!success) {
+ public TaskResult process(
+ OMUpdateEventBatch events, Map<String, Integer> subTaskSeekPosMap) {
+ boolean anyFailure = false; // Track if any bucket fails
+ Map<String, Integer> updatedSeekPositions = new HashMap<>();
+
+ // Process FSO bucket
+ Integer bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.FSO.name(),
0);
+ Pair<Integer, Boolean> bucketResult =
nsSummaryTaskWithFSO.processWithFSO(events, bucketSeek);
+ updatedSeekPositions.put(BucketType.FSO.name(), bucketResult.getLeft());
+ if (!bucketResult.getRight()) {
LOG.error("processWithFSO failed.");
+ anyFailure = true;
}
- success = nsSummaryTaskWithLegacy.processWithLegacy(events);
- if (!success) {
+
+ // Process Legacy bucket
+ bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.LEGACY.name(), 0);
+ bucketResult = nsSummaryTaskWithLegacy.processWithLegacy(events,
bucketSeek);
+ updatedSeekPositions.put(BucketType.LEGACY.name(), bucketResult.getLeft());
+ if (!bucketResult.getRight()) {
LOG.error("processWithLegacy failed.");
+ anyFailure = true;
}
- success = nsSummaryTaskWithOBS.processWithOBS(events);
- if (!success) {
+
+ // Process OBS bucket
+ bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.OBS.name(), 0);
+ bucketResult = nsSummaryTaskWithOBS.processWithOBS(events, bucketSeek);
+ updatedSeekPositions.put(BucketType.OBS.name(), bucketResult.getLeft());
+ if (!bucketResult.getRight()) {
LOG.error("processWithOBS failed.");
+ anyFailure = true;
}
- LOG.debug("{} successfully processed in {} milliseconds",
- getTaskName(), (System.currentTimeMillis() - startTime));
- return new ImmutablePair<>(getTaskName(), success);
+
+ // Return task failure if any bucket failed, while keeping each bucket's
latest seek position
+ return new TaskResult.Builder()
+ .setTaskName(getTaskName())
+ .setSubTaskSeekPositions(updatedSeekPositions)
+ .setTaskSuccess(!anyFailure)
+ .build();
}
@Override
- public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+ public TaskResult reprocess(OMMetadataManager omMetadataManager) {
// Initialize a list of tasks to run in parallel
Collection<Callable<Boolean>> tasks = new ArrayList<>();
@@ -130,7 +177,7 @@ public Pair<String, Boolean> reprocess(OMMetadataManager
omMetadataManager) {
} catch (IOException ioEx) {
LOG.error("Unable to clear NSSummary table in Recon DB. ",
ioEx);
- return new ImmutablePair<>(getTaskName(), false);
+ return buildTaskResult(false);
}
tasks.add(() -> nsSummaryTaskWithFSO
@@ -150,15 +197,12 @@ public Pair<String, Boolean> reprocess(OMMetadataManager
omMetadataManager) {
results = executorService.invokeAll(tasks);
for (int i = 0; i < results.size(); i++) {
if (results.get(i).get().equals(false)) {
- return new ImmutablePair<>(getTaskName(), false);
+ return buildTaskResult(false);
}
}
- } catch (InterruptedException ex) {
+ } catch (InterruptedException | ExecutionException ex) {
LOG.error("Error while reprocessing NSSummary table in Recon DB.", ex);
- return new ImmutablePair<>(getTaskName(), false);
- } catch (ExecutionException ex2) {
- LOG.error("Error while reprocessing NSSummary table in Recon DB.", ex2);
- return new ImmutablePair<>(getTaskName(), false);
+ return buildTaskResult(false);
} finally {
executorService.shutdown();
@@ -171,7 +215,7 @@ public Pair<String, Boolean> reprocess(OMMetadataManager
omMetadataManager) {
LOG.debug("Task execution time: {} milliseconds", durationInMillis);
}
- return new ImmutablePair<>(getTaskName(), true);
+ return buildTaskResult(true);
}
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java
index da223665c3..4b0b851490 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java
@@ -71,15 +71,16 @@ public ReconOMMetadataManager getReconOMMetadataManager() {
protected void writeNSSummariesToDB(Map<Long, NSSummary> nsSummaryMap)
throws IOException {
try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) {
- nsSummaryMap.keySet().forEach((Long key) -> {
+ for (Map.Entry<Long, NSSummary> entry : nsSummaryMap.entrySet()) {
try {
reconNamespaceSummaryManager.batchStoreNSSummaries(rdbBatchOperation,
- key, nsSummaryMap.get(key));
+ entry.getKey(), entry.getValue());
} catch (IOException e) {
LOG.error("Unable to write Namespace Summary data in Recon DB.",
e);
+ throw e;
}
- });
+ }
reconNamespaceSummaryManager.commitBatchOperation(rdbBatchOperation);
}
}
@@ -201,20 +202,11 @@ protected void handleDeleteDirEvent(OmDirectoryInfo
directoryInfo,
protected boolean flushAndCommitNSToDB(Map<Long, NSSummary> nsSummaryMap) {
try {
writeNSSummariesToDB(nsSummaryMap);
- nsSummaryMap.clear();
} catch (IOException e) {
LOG.error("Unable to write Namespace Summary data in Recon DB.", e);
return false;
- }
- return true;
- }
-
- protected boolean checkAndCallFlushToDB(
- Map<Long, NSSummary> nsSummaryMap) {
- // if map contains more than entries, flush to DB and clear the map
- if (null != nsSummaryMap && nsSummaryMap.size() >=
- nsSummaryFlushToDBMaxThreshold) {
- return flushAndCommitNSToDB(nsSummaryMap);
+ } finally {
+ nsSummaryMap.clear();
}
return true;
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java
index 030574e0d6..6ebc36331a 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java
@@ -26,6 +26,8 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -47,14 +49,18 @@ public class NSSummaryTaskWithFSO extends
NSSummaryTaskDbEventHandler {
private static final Logger LOG =
LoggerFactory.getLogger(NSSummaryTaskWithFSO.class);
+ private final long nsSummaryFlushToDBMaxThreshold;
+
public NSSummaryTaskWithFSO(ReconNamespaceSummaryManager
reconNamespaceSummaryManager,
ReconOMMetadataManager
reconOMMetadataManager,
OzoneConfiguration
- ozoneConfiguration) {
+ ozoneConfiguration,
+ long nsSummaryFlushToDBMaxThreshold) {
super(reconNamespaceSummaryManager,
reconOMMetadataManager, ozoneConfiguration);
+ this.nsSummaryFlushToDBMaxThreshold = nsSummaryFlushToDBMaxThreshold;
}
// We only listen to updates from FSO-enabled KeyTable(FileTable) and
DirTable
@@ -62,15 +68,23 @@ public Collection<String> getTaskTables() {
return Arrays.asList(FILE_TABLE, DIRECTORY_TABLE);
}
- public boolean processWithFSO(OMUpdateEventBatch events) {
+ public Pair<Integer, Boolean> processWithFSO(OMUpdateEventBatch events,
+ int seekPos) {
Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
+ int itrPos = 0;
+ while (eventIterator.hasNext() && itrPos < seekPos) {
+ eventIterator.next();
+ itrPos++;
+ }
final Collection<String> taskTables = getTaskTables();
Map<Long, NSSummary> nsSummaryMap = new HashMap<>();
+ int eventCounter = 0;
while (eventIterator.hasNext()) {
OMDBUpdateEvent<String, ? extends
WithParentObjectId> omdbUpdateEvent = eventIterator.next();
OMDBUpdateEvent.OMDBUpdateAction action = omdbUpdateEvent.getAction();
+ eventCounter++;
// we only process updates on OM's FileTable and Dirtable
String table = omdbUpdateEvent.getTable();
@@ -149,20 +163,23 @@ public boolean processWithFSO(OMUpdateEventBatch events) {
} catch (IOException ioEx) {
LOG.error("Unable to process Namespace Summary data in Recon DB. ",
ioEx);
- return false;
+ nsSummaryMap.clear();
+ return new ImmutablePair<>(seekPos, false);
}
- if (!checkAndCallFlushToDB(nsSummaryMap)) {
- return false;
+ if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) {
+ if (!flushAndCommitNSToDB(nsSummaryMap)) {
+ return new ImmutablePair<>(seekPos, false);
+ }
+ seekPos = eventCounter + 1;
}
}
// flush and commit left out entries at end
if (!flushAndCommitNSToDB(nsSummaryMap)) {
- return false;
+ return new ImmutablePair<>(seekPos, false);
}
-
LOG.debug("Completed a process run of NSSummaryTaskWithFSO");
- return true;
+ return new ImmutablePair<>(seekPos, true);
}
public boolean reprocessWithFSO(OMMetadataManager omMetadataManager) {
@@ -178,8 +195,10 @@ public boolean reprocessWithFSO(OMMetadataManager
omMetadataManager) {
Table.KeyValue<String, OmDirectoryInfo> kv = dirTableIter.next();
OmDirectoryInfo directoryInfo = kv.getValue();
handlePutDirEvent(directoryInfo, nsSummaryMap);
- if (!checkAndCallFlushToDB(nsSummaryMap)) {
- return false;
+ if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) {
+ if (!flushAndCommitNSToDB(nsSummaryMap)) {
+ return false;
+ }
}
}
}
@@ -194,8 +213,10 @@ public boolean reprocessWithFSO(OMMetadataManager
omMetadataManager) {
Table.KeyValue<String, OmKeyInfo> kv = keyTableIter.next();
OmKeyInfo keyInfo = kv.getValue();
handlePutKeyEvent(keyInfo, nsSummaryMap);
- if (!checkAndCallFlushToDB(nsSummaryMap)) {
- return false;
+ if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) {
+ if (!flushAndCommitNSToDB(nsSummaryMap)) {
+ return false;
+ }
}
}
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java
index cf29f23813..a146003917 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java
@@ -25,6 +25,8 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -51,24 +53,35 @@ public class NSSummaryTaskWithLegacy extends
NSSummaryTaskDbEventHandler {
private static final Logger LOG =
LoggerFactory.getLogger(NSSummaryTaskWithLegacy.class);
- private boolean enableFileSystemPaths;
+ private final boolean enableFileSystemPaths;
+ private final long nsSummaryFlushToDBMaxThreshold;
public NSSummaryTaskWithLegacy(ReconNamespaceSummaryManager
reconNamespaceSummaryManager,
ReconOMMetadataManager
reconOMMetadataManager,
OzoneConfiguration
- ozoneConfiguration) {
+ ozoneConfiguration,
+ long nsSummaryFlushToDBMaxThreshold) {
super(reconNamespaceSummaryManager,
reconOMMetadataManager, ozoneConfiguration);
// true if FileSystemPaths enabled
enableFileSystemPaths = ozoneConfiguration
.getBoolean(OmConfig.Keys.ENABLE_FILESYSTEM_PATHS,
OmConfig.Defaults.ENABLE_FILESYSTEM_PATHS);
+ this.nsSummaryFlushToDBMaxThreshold = nsSummaryFlushToDBMaxThreshold;
}
- public boolean processWithLegacy(OMUpdateEventBatch events) {
+ public Pair<Integer, Boolean> processWithLegacy(OMUpdateEventBatch events,
+ int seekPos) {
Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
+ int itrPos = 0;
+ while (eventIterator.hasNext() && itrPos < seekPos) {
+ eventIterator.next();
+ itrPos++;
+ }
+
+ int eventCounter = 0;
Map<Long, NSSummary> nsSummaryMap = new HashMap<>();
ReconOMMetadataManager metadataManager = getReconOMMetadataManager();
@@ -76,6 +89,7 @@ public boolean processWithLegacy(OMUpdateEventBatch events) {
OMDBUpdateEvent<String, ? extends WithParentObjectId> omdbUpdateEvent =
eventIterator.next();
OMDBUpdateEvent.OMDBUpdateAction action = omdbUpdateEvent.getAction();
+ eventCounter++;
// we only process updates on OM's KeyTable
String table = omdbUpdateEvent.getTable();
@@ -114,20 +128,24 @@ public boolean processWithLegacy(OMUpdateEventBatch
events) {
} catch (IOException ioEx) {
LOG.error("Unable to process Namespace Summary data in Recon DB. ",
ioEx);
- return false;
+ nsSummaryMap.clear();
+ return new ImmutablePair<>(seekPos, false);
}
- if (!checkAndCallFlushToDB(nsSummaryMap)) {
- return false;
+ if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) {
+ if (!flushAndCommitNSToDB(nsSummaryMap)) {
+ return new ImmutablePair<>(seekPos, false);
+ }
+ seekPos = eventCounter + 1;
}
}
// flush and commit left out entries at end
if (!flushAndCommitNSToDB(nsSummaryMap)) {
- return false;
+ return new ImmutablePair<>(seekPos, false);
}
LOG.debug("Completed a process run of NSSummaryTaskWithLegacy");
- return true;
+ return new ImmutablePair<>(seekPos, true);
}
private void processWithFileSystemLayout(OmKeyInfo updatedKeyInfo,
@@ -278,14 +296,17 @@ public boolean reprocessWithLegacy(OMMetadataManager
omMetadataManager) {
setParentBucketId(keyInfo);
handlePutKeyEvent(keyInfo, nsSummaryMap);
}
- if (!checkAndCallFlushToDB(nsSummaryMap)) {
- return false;
+ if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) {
+ if (!flushAndCommitNSToDB(nsSummaryMap)) {
+ return false;
+ }
}
}
}
} catch (IOException ioEx) {
LOG.error("Unable to reprocess Namespace Summary data in Recon DB. ",
ioEx);
+ nsSummaryMap.clear();
return false;
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java
index 7364639d47..e15cc2836f 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java
@@ -23,6 +23,8 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -47,13 +49,17 @@ public class NSSummaryTaskWithOBS extends
NSSummaryTaskDbEventHandler {
private static final Logger LOG =
LoggerFactory.getLogger(NSSummaryTaskWithOBS.class);
+ private final long nsSummaryFlushToDBMaxThreshold;
+
public NSSummaryTaskWithOBS(
ReconNamespaceSummaryManager reconNamespaceSummaryManager,
ReconOMMetadataManager reconOMMetadataManager,
- OzoneConfiguration ozoneConfiguration) {
+ OzoneConfiguration ozoneConfiguration,
+ long nsSummaryFlushToDBMaxThreshold) {
super(reconNamespaceSummaryManager,
reconOMMetadataManager, ozoneConfiguration);
+ this.nsSummaryFlushToDBMaxThreshold = nsSummaryFlushToDBMaxThreshold;
}
@@ -89,14 +95,17 @@ public boolean reprocessWithOBS(OMMetadataManager
omMetadataManager) {
setKeyParentID(keyInfo);
handlePutKeyEvent(keyInfo, nsSummaryMap);
- if (!checkAndCallFlushToDB(nsSummaryMap)) {
- return false;
+ if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) {
+ if (!flushAndCommitNSToDB(nsSummaryMap)) {
+ return false;
+ }
}
}
}
} catch (IOException ioEx) {
LOG.error("Unable to reprocess Namespace Summary data in Recon DB. ",
ioEx);
+ nsSummaryMap.clear();
return false;
}
@@ -108,14 +117,23 @@ public boolean reprocessWithOBS(OMMetadataManager
omMetadataManager) {
return true;
}
- public boolean processWithOBS(OMUpdateEventBatch events) {
+ public Pair<Integer, Boolean> processWithOBS(OMUpdateEventBatch events,
+ int seekPos) {
Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
Map<Long, NSSummary> nsSummaryMap = new HashMap<>();
+ int itrPos = 0;
+ while (eventIterator.hasNext() && itrPos < seekPos) {
+ eventIterator.next();
+ itrPos++;
+ }
+
+ int eventCounter = 0;
while (eventIterator.hasNext()) {
OMDBUpdateEvent<String, ? extends WithParentObjectId> omdbUpdateEvent =
eventIterator.next();
OMDBUpdateEvent.OMDBUpdateAction action = omdbUpdateEvent.getAction();
+ eventCounter++;
// We only process updates on OM's KeyTable
String table = omdbUpdateEvent.getTable();
@@ -181,27 +199,27 @@ public boolean processWithOBS(OMUpdateEventBatch events) {
default:
LOG.debug("Skipping DB update event: {}", action);
}
-
- if (!checkAndCallFlushToDB(nsSummaryMap)) {
- return false;
+ if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) {
+ if (!flushAndCommitNSToDB(nsSummaryMap)) {
+ return new ImmutablePair<>(seekPos, false);
+ }
+ seekPos = eventCounter + 1;
}
} catch (IOException ioEx) {
LOG.error("Unable to process Namespace Summary data in Recon DB. ",
ioEx);
- return false;
- }
- if (!checkAndCallFlushToDB(nsSummaryMap)) {
- return false;
+ nsSummaryMap.clear();
+ return new ImmutablePair<>(seekPos, false);
}
}
// Flush and commit left-out entries at the end
if (!flushAndCommitNSToDB(nsSummaryMap)) {
- return false;
+ return new ImmutablePair<>(seekPos, false);
}
LOG.debug("Completed a process run of NSSummaryTaskWithOBS");
- return true;
+ return new ImmutablePair<>(seekPos, true);
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
index 72d5906b96..6019990158 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
@@ -36,8 +36,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -99,7 +97,7 @@ public void init() {
/**
* Iterates the rows of each table in the OM snapshot DB and calculates the
* counts and sizes for table data.
- *
+ * <p>
* For tables that require data size calculation
* (as returned by getTablesToCalculateSize), both the number of
* records (count) and total data size of the records are calculated.
@@ -109,7 +107,7 @@ public void init() {
* @return Pair
*/
@Override
- public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+ public TaskResult reprocess(OMMetadataManager omMetadataManager) {
init();
for (String tableName : tables) {
Table table = omMetadataManager.getTable(tableName);
@@ -131,7 +129,7 @@ public Pair<String, Boolean> reprocess(OMMetadataManager
omMetadataManager) {
}
} catch (IOException ioEx) {
LOG.error("Unable to populate Table Count in Recon DB.", ioEx);
- return new ImmutablePair<>(getTaskName(), false);
+ return buildTaskResult(false);
}
}
// Write the data to the DB
@@ -146,7 +144,7 @@ public Pair<String, Boolean> reprocess(OMMetadataManager
omMetadataManager) {
}
LOG.debug("Completed a 'reprocess' run of OmTableInsightTask.");
- return new ImmutablePair<>(getTaskName(), true);
+ return buildTaskResult(true);
}
@Override
@@ -162,11 +160,13 @@ public Collection<String> getTaskTables() {
* Read the update events and update the count and sizes of respective object
* (volume, bucket, key etc.) based on the action (put or delete).
*
- * @param events Update events - PUT, DELETE and UPDATE.
+ * @param events Update events - PUT, DELETE and UPDATE.
+ * @param subTaskSeekPosMap
* @return Pair
*/
@Override
- public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+ public TaskResult process(OMUpdateEventBatch events,
+ Map<String, Integer> subTaskSeekPosMap) {
Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
String tableName;
@@ -201,7 +201,7 @@ public Pair<String, Boolean> process(OMUpdateEventBatch
events) {
LOG.error(
"Unexpected exception while processing the table {}, Action: {}",
tableName, omdbUpdateEvent.getAction(), e);
- return new ImmutablePair<>(getTaskName(), false);
+ return buildTaskResult(false);
}
}
// Write the updated count and size information to the database
@@ -216,7 +216,7 @@ public Pair<String, Boolean> process(OMUpdateEventBatch
events) {
}
LOG.debug("{} successfully processed in {} milliseconds",
getTaskName(), (System.currentTimeMillis() - startTime));
- return new ImmutablePair<>(getTaskName(), true);
+ return buildTaskResult(true);
}
private void handlePutEvent(OMDBUpdateEvent<String, Object> event,
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconOmTask.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconOmTask.java
index 01079d529b..395fdf6b1e 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconOmTask.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconOmTask.java
@@ -17,7 +17,8 @@
package org.apache.hadoop.ozone.recon.tasks;
-import org.apache.commons.lang3.tuple.Pair;
+import java.util.Collections;
+import java.util.Map;
import org.apache.hadoop.ozone.om.OMMetadataManager;
/**
@@ -38,16 +39,109 @@ default void init() { }
/**
* Process a set of OM events on tables that the task is listening on.
- * @param events Set of events to be processed by the task.
- * @return Pair of task name -> task success.
+ *
+ * @param events The batch of OM update events to be processed.
+ * @param subTaskSeekPosMap A map containing the seek positions for
+ * each sub-task, indicating where processing
should start.
+ * @return A {@link TaskResult} containing:
+ * - The task name.
+ * - A map of sub-task names to their respective seek positions.
+ * - A boolean indicating whether the task was successful.
*/
- Pair<String, Boolean> process(OMUpdateEventBatch events);
+ TaskResult process(OMUpdateEventBatch events,
+ Map<String, Integer> subTaskSeekPosMap);
/**
- * Process a on tables that the task is listening on.
- * @param omMetadataManager OM Metadata manager instance.
- * @return Pair of task name -> task success.
+ * Reprocesses full entries in Recon OM RocksDB tables that the task is
listening to.
+ *
+ * @param omMetadataManager The OM Metadata Manager instance used for
accessing metadata.
+ * @return A {@link TaskResult} containing:
+ * - The task name.
+ * - A map of sub-task names to their respective seek positions.
+ * - A boolean indicating whether the task was successful.
*/
- Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager);
+ TaskResult reprocess(OMMetadataManager omMetadataManager);
+ /**
+ * Represents the result of a task execution, including the task name,
+ * sub-task seek positions, and success status.
+ *
+ * <p>This class is immutable and uses the Builder pattern for object
creation.</p>
+ */
+ class TaskResult {
+ private final String taskName;
+ private final Map<String, Integer> subTaskSeekPositions;
+ private final boolean taskSuccess;
+
+ /**
+ * Private constructor to enforce the use of the {@link Builder}.
+ *
+ * @param builder The builder instance containing values for
initialization.
+ */
+ private TaskResult(Builder builder) {
+ this.taskName = builder.taskName;
+ this.subTaskSeekPositions = builder.subTaskSeekPositions != null
+ ? builder.subTaskSeekPositions
+ : Collections.emptyMap(); // Default value
+ this.taskSuccess = builder.taskSuccess;
+ }
+
+ // Getters
+ public String getTaskName() {
+ return taskName;
+ }
+
+ public Map<String, Integer> getSubTaskSeekPositions() {
+ return subTaskSeekPositions;
+ }
+
+ public boolean isTaskSuccess() {
+ return taskSuccess;
+ }
+
+ /**
+ * Builder class for creating instances of {@link TaskResult}.
+ */
+ public static class Builder {
+ private String taskName;
+ private Map<String, Integer> subTaskSeekPositions =
Collections.emptyMap(); // Default value
+ private boolean taskSuccess;
+
+ public Builder setTaskName(String taskName) {
+ this.taskName = taskName;
+ return this;
+ }
+
+ public Builder setSubTaskSeekPositions(Map<String, Integer>
subTaskSeekPositions) {
+ this.subTaskSeekPositions = subTaskSeekPositions;
+ return this;
+ }
+
+ public Builder setTaskSuccess(boolean taskSuccess) {
+ this.taskSuccess = taskSuccess;
+ return this;
+ }
+
+ public TaskResult build() {
+ return new TaskResult(this);
+ }
+ }
+
+ // toString Method for debugging
+ @Override
+ public String toString() {
+ return "TaskResult{" +
+ "taskName='" + taskName + '\'' +
+ ", subTaskSeekPositions=" + subTaskSeekPositions +
+ ", taskSuccess=" + taskSuccess +
+ '}';
+ }
+ }
+
+ default TaskResult buildTaskResult(boolean success) {
+ return new TaskResult.Builder()
+ .setTaskName(getTaskName())
+ .setTaskSuccess(success)
+ .build();
+ }
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
index c1d786db1a..e289b6ae15 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
@@ -24,6 +24,7 @@
import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -36,7 +37,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.recon.ReconConstants;
@@ -97,27 +97,27 @@ public void registerTask(ReconOmTask task) {
@Override
public synchronized void consumeOMEvents(OMUpdateEventBatch events,
OMMetadataManager omMetadataManager) {
if (!events.isEmpty()) {
- Collection<NamedCallableTask<Pair<String, Boolean>>> tasks = new
ArrayList<>();
- List<String> failedTasks = new ArrayList<>();
+ Collection<NamedCallableTask<ReconOmTask.TaskResult>> tasks = new
ArrayList<>();
+ List<ReconOmTask.TaskResult> failedTasks = new ArrayList<>();
for (Map.Entry<String, ReconOmTask> taskEntry :
reconOmTasks.entrySet()) {
ReconOmTask task = taskEntry.getValue();
ReconTaskStatusUpdater taskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName());
taskStatusUpdater.recordRunStart();
// events passed to process method is no longer filtered
- tasks.add(new NamedCallableTask<>(task.getTaskName(), () ->
task.process(events)));
+ tasks.add(new NamedCallableTask<>(task.getTaskName(), () ->
task.process(events, Collections.emptyMap())));
}
processTasks(tasks, events, failedTasks);
// Retry processing failed tasks
- List<String> retryFailedTasks = new ArrayList<>();
+ List<ReconOmTask.TaskResult> retryFailedTasks = new ArrayList<>();
if (!failedTasks.isEmpty()) {
tasks.clear();
- for (String taskName : failedTasks) {
- ReconOmTask task = reconOmTasks.get(taskName);
+ for (ReconOmTask.TaskResult taskResult : failedTasks) {
+ ReconOmTask task = reconOmTasks.get(taskResult.getTaskName());
// events passed to process method is no longer filtered
tasks.add(new NamedCallableTask<>(task.getTaskName(),
- () -> task.process(events)));
+ () -> task.process(events,
taskResult.getSubTaskSeekPositions())));
}
processTasks(tasks, events, retryFailedTasks);
}
@@ -126,12 +126,15 @@ public synchronized void
consumeOMEvents(OMUpdateEventBatch events, OMMetadataMa
ReconConstants.resetTableTruncatedFlags();
if (!retryFailedTasks.isEmpty()) {
tasks.clear();
- for (String taskName : failedTasks) {
- ReconOmTask task = reconOmTasks.get(taskName);
+ for (ReconOmTask.TaskResult taskResult : failedTasks) {
+ ReconOmTask task = reconOmTasks.get(taskResult.getTaskName());
tasks.add(new NamedCallableTask<>(task.getTaskName(), () ->
task.reprocess(omMetadataManager)));
}
- List<String> reprocessFailedTasks = new ArrayList<>();
+ List<ReconOmTask.TaskResult> reprocessFailedTasks = new ArrayList<>();
processTasks(tasks, events, reprocessFailedTasks);
+ // Here the assumption is that even if full re-process of task also
fails,
+ // then there is something wrong in recon rocks DB got from OM and
needs to be
+ // investigated.
ignoreFailedTasks(reprocessFailedTasks);
}
}
@@ -141,8 +144,9 @@ public synchronized void consumeOMEvents(OMUpdateEventBatch
events, OMMetadataMa
* Ignore tasks that failed reprocess step more than threshold times.
* @param failedTasks list of failed tasks.
*/
- private void ignoreFailedTasks(List<String> failedTasks) {
- for (String taskName : failedTasks) {
+ private void ignoreFailedTasks(List<ReconOmTask.TaskResult> failedTasks) {
+ for (ReconOmTask.TaskResult taskResult : failedTasks) {
+ String taskName = taskResult.getTaskName();
LOG.info("Reprocess step failed for task {}.", taskName);
if (taskFailureCounter.get(taskName).incrementAndGet() >
TASK_FAILURE_THRESHOLD) {
@@ -155,14 +159,16 @@ private void ignoreFailedTasks(List<String> failedTasks) {
@Override
public synchronized void reInitializeTasks(ReconOMMetadataManager
omMetadataManager) {
- Collection<NamedCallableTask<Pair<String, Boolean>>> tasks = new
ArrayList<>();
+ Collection<NamedCallableTask<ReconOmTask.TaskResult>> tasks = new
ArrayList<>();
ReconConstants.resetTableTruncatedFlags();
for (Map.Entry<String, ReconOmTask> taskEntry :
reconOmTasks.entrySet()) {
ReconOmTask task = taskEntry.getValue();
- ReconTaskStatusUpdater taskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName());
+ ReconTaskStatusUpdater taskStatusUpdater =
+ taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName());
taskStatusUpdater.recordRunStart();
- tasks.add(new NamedCallableTask<>(task.getTaskName(), () ->
task.reprocess(omMetadataManager)));
+ tasks.add(new NamedCallableTask<>(task.getTaskName(),
+ () -> task.reprocess(omMetadataManager)));
}
try {
@@ -178,14 +184,16 @@ public synchronized void
reInitializeTasks(ReconOMMetadataManager omMetadataMana
throw new TaskExecutionException(task.getTaskName(), e);
}
}, executorService).thenAccept(result -> {
- String taskName = result.getLeft();
- ReconTaskStatusUpdater taskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
- if (!result.getRight()) {
+ String taskName = result.getTaskName();
+ ReconTaskStatusUpdater taskStatusUpdater =
+ taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
+ if (!result.isTaskSuccess()) {
LOG.error("Init failed for task {}.", taskName);
taskStatusUpdater.setLastTaskRunStatus(-1);
} else {
taskStatusUpdater.setLastTaskRunStatus(0);
-
taskStatusUpdater.setLastUpdatedSeqNumber(omMetadataManager.getLastSequenceNumberFromDB());
+ taskStatusUpdater.setLastUpdatedSeqNumber(
+ omMetadataManager.getLastSequenceNumberFromDB());
}
taskStatusUpdater.recordRunCompletion();
}).exceptionally(ex -> {
@@ -237,8 +245,9 @@ public synchronized void stop() {
* @param events A batch of {@link OMUpdateEventBatch} events to fetch
sequence number of last event in batch.
* @param failedTasks Reference of the list to which we want to add the
failed tasks for retry/reprocessing
*/
- private void processTasks(Collection<NamedCallableTask<Pair<String,
Boolean>>> tasks,
- OMUpdateEventBatch events, List<String>
failedTasks) {
+ private void processTasks(
+ Collection<NamedCallableTask<ReconOmTask.TaskResult>> tasks,
+ OMUpdateEventBatch events, List<ReconOmTask.TaskResult> failedTasks) {
List<CompletableFuture<Void>> futures = tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> {
try {
@@ -251,11 +260,15 @@ private void
processTasks(Collection<NamedCallableTask<Pair<String, Boolean>>> t
throw new TaskExecutionException(task.getTaskName(), e);
}
}, executorService).thenAccept(result -> {
- String taskName = result.getLeft();
- ReconTaskStatusUpdater taskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
- if (!result.getRight()) {
+ String taskName = result.getTaskName();
+ ReconTaskStatusUpdater taskStatusUpdater =
+ taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
+ if (!result.isTaskSuccess()) {
LOG.error("Task {} failed", taskName);
- failedTasks.add(result.getLeft());
+ failedTasks.add(new ReconOmTask.TaskResult.Builder()
+ .setTaskName(taskName)
+ .setSubTaskSeekPositions(result.getSubTaskSeekPositions())
+ .build());
taskStatusUpdater.setLastTaskRunStatus(-1);
} else {
taskFailureCounter.get(taskName).set(0);
@@ -270,7 +283,8 @@ private void
processTasks(Collection<NamedCallableTask<Pair<String, Boolean>>> t
String taskName = taskEx.getTaskName();
LOG.error("The above error occurred while trying to execute task:
{}", taskName);
- ReconTaskStatusUpdater taskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
+ ReconTaskStatusUpdater taskStatusUpdater =
+ taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
taskStatusUpdater.setLastTaskRunStatus(-1);
taskStatusUpdater.recordRunCompletion();
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
index 9efd3c9e99..a81c5d1833 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java
@@ -477,7 +477,7 @@ public void testGetKeysForContainer() throws IOException {
setUpFSOData();
NSSummaryTaskWithFSO nSSummaryTaskWithFso =
new NSSummaryTaskWithFSO(reconNamespaceSummaryManager,
- reconOMMetadataManager, new OzoneConfiguration());
+ reconOMMetadataManager, new OzoneConfiguration(), 10);
nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
// Reprocess the container key mapper to ensure the latest mapping is used
reprocessContainerKeyMapper();
@@ -565,7 +565,7 @@ public void testGetKeysForContainerWithPrevKey() throws
IOException {
reprocessContainerKeyMapper();
NSSummaryTaskWithFSO nSSummaryTaskWithFso =
new NSSummaryTaskWithFSO(reconNamespaceSummaryManager,
- reconOMMetadataManager, new OzoneConfiguration());
+ reconOMMetadataManager, new OzoneConfiguration(), 10);
nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
response = containerEndpoint.getKeysForContainer(20L, -1, "/0/1/2/file7");
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
index 52ac1d64f4..9b16643d0f 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
@@ -66,7 +66,6 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -135,6 +134,7 @@
import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskFSO;
import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskOBS;
import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask;
+import org.apache.hadoop.ozone.recon.tasks.ReconOmTask;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.apache.ozone.test.LambdaTestUtils;
import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
@@ -789,9 +789,9 @@ public void testGetClusterState() throws Exception {
});
omTableInsightTask.init();
// check volume, bucket and key count after running table count task
- Pair<String, Boolean> result =
+ ReconOmTask.TaskResult result =
omTableInsightTask.reprocess(reconOMMetadataManager);
- assertTrue(result.getRight());
+ assertTrue(result.isTaskSuccess());
response = clusterStateEndpoint.getClusterState();
clusterStateResponse = (ClusterStateResponse) response.getEntity();
assertEquals(2, clusterStateResponse.getVolumes());
@@ -869,10 +869,10 @@ public void testGetFileCounts() throws Exception {
.thenReturn(omKeyInfo3);
// Call reprocess on both endpoints.
- Pair<String, Boolean> resultOBS =
fileSizeCountTaskOBS.reprocess(omMetadataManager);
- Pair<String, Boolean> resultFSO =
fileSizeCountTaskFSO.reprocess(omMetadataManager);
- assertTrue(resultOBS.getRight());
- assertTrue(resultFSO.getRight());
+ ReconOmTask.TaskResult resultOBS =
fileSizeCountTaskOBS.reprocess(omMetadataManager);
+ ReconOmTask.TaskResult resultFSO =
fileSizeCountTaskFSO.reprocess(omMetadataManager);
+ assertTrue(resultOBS.isTaskSuccess());
+ assertTrue(resultFSO.isTaskSuccess());
// The two tasks should result in 3 rows.
assertEquals(3, fileCountBySizeDao.count());
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryDiskUsageOrdering.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryDiskUsageOrdering.java
index 810c9096c9..84d0807a34 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryDiskUsageOrdering.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryDiskUsageOrdering.java
@@ -106,7 +106,7 @@ public void setUp() throws Exception {
populateOMDB();
NSSummaryTaskWithFSO nSSummaryTaskWithFso =
new NSSummaryTaskWithFSO(reconNamespaceSummaryManager,
- reconOMMetadataManager, ozoneConfiguration);
+ reconOMMetadataManager, ozoneConfiguration, 10);
nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java
index edb495421f..77d5bb69a5 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java
@@ -389,7 +389,7 @@ public void setUp() throws Exception {
populateOMDB();
NSSummaryTaskWithFSO nSSummaryTaskWithFso =
new NSSummaryTaskWithFSO(reconNamespaceSummaryManager,
- reconOMMetadataManager, ozoneConfiguration);
+ reconOMMetadataManager, ozoneConfiguration, 10);
nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
commonUtils = new CommonUtils();
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithLegacy.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithLegacy.java
index 8dca6ed566..1a01e12543 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithLegacy.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithLegacy.java
@@ -386,7 +386,7 @@ public void setUp() throws Exception {
populateOMDB();
NSSummaryTaskWithLegacy nsSummaryTaskWithLegacy =
new NSSummaryTaskWithLegacy(reconNamespaceSummaryManager,
- reconOMMetadataManager, conf);
+ reconOMMetadataManager, conf, 10);
nsSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager);
commonUtils = new CommonUtils();
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithOBSAndLegacy.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithOBSAndLegacy.java
index 7743b5aa39..a162f48d82 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithOBSAndLegacy.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithOBSAndLegacy.java
@@ -382,11 +382,11 @@ public void setUp() throws Exception {
populateOMDB();
NSSummaryTaskWithOBS nsSummaryTaskWithOBS =
new NSSummaryTaskWithOBS(reconNamespaceSummaryManager,
- reconOMMetadataManager, conf);
+ reconOMMetadataManager, conf, 10);
nsSummaryTaskWithOBS.reprocessWithOBS(reconOMMetadataManager);
NSSummaryTaskWithLegacy nsSummaryTaskWithLegacy =
new NSSummaryTaskWithLegacy(reconNamespaceSummaryManager,
- reconOMMetadataManager, conf);
+ reconOMMetadataManager, conf, 10);
nsSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager);
commonUtils = new CommonUtils();
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
index 065c5ac2ca..26fb6fe21b 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
@@ -321,13 +321,13 @@ public void setUp() throws Exception {
setUpOmData();
nSSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy(
reconNamespaceSummaryManager,
- reconOMMetadataManager, ozoneConfiguration);
+ reconOMMetadataManager, ozoneConfiguration, 10);
nsSummaryTaskWithOBS = new NSSummaryTaskWithOBS(
reconNamespaceSummaryManager,
- reconOMMetadataManager, ozoneConfiguration);
+ reconOMMetadataManager, ozoneConfiguration, 10);
nsSummaryTaskWithFSO = new NSSummaryTaskWithFSO(
reconNamespaceSummaryManager,
- reconOMMetadataManager, ozoneConfiguration);
+ reconOMMetadataManager, ozoneConfiguration, 10);
reconNamespaceSummaryManager.clearNSSummaryTable();
nSSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager);
nsSummaryTaskWithOBS.reprocessWithOBS(reconOMMetadataManager);
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenKeysSearchEndpoint.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenKeysSearchEndpoint.java
index ce84eec418..5123619416 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenKeysSearchEndpoint.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenKeysSearchEndpoint.java
@@ -116,7 +116,7 @@ public void setUp() throws Exception {
populateOMDB();
NSSummaryTaskWithFSO nSSummaryTaskWithFso =
new NSSummaryTaskWithFSO(reconNamespaceSummaryManager,
- reconOMMetadataManager, ozoneConfiguration);
+ reconOMMetadataManager, ozoneConfiguration, 10);
nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java
index aec2b7d6e2..2fc1c320d1 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java
@@ -19,8 +19,7 @@
import java.util.Collection;
import java.util.Collections;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
+import java.util.Map;
import org.apache.hadoop.ozone.om.OMMetadataManager;
/**
@@ -52,20 +51,21 @@ public Collection<String> getTaskTables() {
}
@Override
- public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+ public TaskResult process(
+ OMUpdateEventBatch events, Map<String, Integer> seekPos) {
if (++callCtr <= numFailuresAllowed) {
- return new ImmutablePair<>(getTaskName(), false);
+ return buildTaskResult(false);
} else {
- return new ImmutablePair<>(getTaskName(), true);
+ return buildTaskResult(true);
}
}
@Override
- public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+ public TaskResult reprocess(OMMetadataManager omMetadataManager) {
if (++callCtr <= numFailuresAllowed) {
- return new ImmutablePair<>(getTaskName(), false);
+ return buildTaskResult(false);
} else {
- return new ImmutablePair<>(getTaskName(), true);
+ return buildTaskResult(true);
}
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
index b78d970bac..36b335c1b4 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
@@ -336,7 +336,7 @@ public void testKeyTableProcess() throws IOException {
assertEquals(1, reconContainerMetadataManager.getKeyCountForContainer(3L));
// Process PUT & DELETE event.
- containerKeyMapperTask.process(omUpdateEventBatch);
+ containerKeyMapperTask.process(omUpdateEventBatch, Collections.emptyMap());
keyPrefixesForContainer = reconContainerMetadataManager
.getKeyPrefixesForContainer(1);
@@ -427,7 +427,7 @@ public void testFileTableProcess() throws Exception {
}, 0L);
// Process PUT event for both the keys
- containerKeyMapperTask.process(omUpdateEventBatch);
+ containerKeyMapperTask.process(omUpdateEventBatch, Collections.emptyMap());
keyPrefixesForContainer = reconContainerMetadataManager
.getKeyPrefixesForContainer(1);
@@ -460,7 +460,7 @@ public void testFileTableProcess() throws Exception {
}, 0L);
// Process DELETE event for key2
- containerKeyMapperTask.process(omUpdateEventBatch2);
+ containerKeyMapperTask.process(omUpdateEventBatch2,
Collections.emptyMap());
keyPrefixesForContainer = reconContainerMetadataManager
.getKeyPrefixesForContainer(1);
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
index 30ca22154b..fc2f152ac0 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
@@ -33,8 +33,8 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.utils.db.TypedTable;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
@@ -130,12 +130,12 @@ public void testReprocess() throws IOException {
fileCountBySizeDao.insert(new FileCountBySize("vol1", "bucket1", 1024L,
10L));
// Call reprocess on both tasks.
- Pair<String, Boolean> resultOBS =
fileSizeCountTaskOBS.reprocess(omMetadataManager);
- Pair<String, Boolean> resultFSO =
fileSizeCountTaskFSO.reprocess(omMetadataManager);
+ ReconOmTask.TaskResult resultOBS =
fileSizeCountTaskOBS.reprocess(omMetadataManager);
+ ReconOmTask.TaskResult resultFSO =
fileSizeCountTaskFSO.reprocess(omMetadataManager);
// Verify that both tasks reported success.
- assertTrue(resultOBS.getRight(), "OBS reprocess should return true");
- assertTrue(resultFSO.getRight(), "FSO reprocess should return true");
+ assertTrue(resultOBS.isTaskSuccess(), "OBS reprocess should return true");
+ assertTrue(resultFSO.isTaskSuccess(), "FSO reprocess should return true");
// After processing, there should be 3 rows (one per bin).
assertEquals(3, fileCountBySizeDao.count(), "Expected 3 rows in the DB");
@@ -197,8 +197,8 @@ public void testProcess() {
new OMUpdateEventBatch(Arrays.asList(event, event2), 0L);
// Process the same batch on both endpoints.
- fileSizeCountTaskOBS.process(omUpdateEventBatch);
- fileSizeCountTaskFSO.process(omUpdateEventBatch);
+ fileSizeCountTaskOBS.process(omUpdateEventBatch, Collections.emptyMap());
+ fileSizeCountTaskFSO.process(omUpdateEventBatch, Collections.emptyMap());
// After processing the first batch:
// Since each endpoint processes the same events, the counts are doubled.
@@ -256,8 +256,8 @@ public void testProcess() {
omUpdateEventBatch = new OMUpdateEventBatch(
Arrays.asList(updateEvent, putEvent, deleteEvent), 0L);
- fileSizeCountTaskOBS.process(omUpdateEventBatch);
- fileSizeCountTaskFSO.process(omUpdateEventBatch);
+ fileSizeCountTaskOBS.process(omUpdateEventBatch, Collections.emptyMap());
+ fileSizeCountTaskFSO.process(omUpdateEventBatch, Collections.emptyMap());
assertEquals(4, fileCountBySizeDao.count());
recordToFind.value3(1024L);
@@ -322,10 +322,10 @@ public void testReprocessAtScale() throws IOException {
when(mockKeyValueFso.getValue()).thenAnswer(returnsElementsOf(omKeyInfoList));
// Call reprocess on both endpoints.
- Pair<String, Boolean> resultOBS =
fileSizeCountTaskOBS.reprocess(omMetadataManager);
- Pair<String, Boolean> resultFSO =
fileSizeCountTaskFSO.reprocess(omMetadataManager);
- assertTrue(resultOBS.getRight());
- assertTrue(resultFSO.getRight());
+ ReconOmTask.TaskResult resultOBS =
fileSizeCountTaskOBS.reprocess(omMetadataManager);
+ ReconOmTask.TaskResult resultFSO =
fileSizeCountTaskFSO.reprocess(omMetadataManager);
+ assertTrue(resultOBS.isTaskSuccess());
+ assertTrue(resultFSO.isTaskSuccess());
// 2 volumes * 500 buckets * 42 bins = 42000 rows
assertEquals(42000, fileCountBySizeDao.count());
@@ -393,8 +393,8 @@ public void testProcessAtScale() {
OMUpdateEventBatch omUpdateEventBatch = new
OMUpdateEventBatch(omDbEventList, 0L);
// Process the same batch on both endpoints.
- fileSizeCountTaskOBS.process(omUpdateEventBatch);
- fileSizeCountTaskFSO.process(omUpdateEventBatch);
+ fileSizeCountTaskOBS.process(omUpdateEventBatch, Collections.emptyMap());
+ fileSizeCountTaskFSO.process(omUpdateEventBatch, Collections.emptyMap());
// Verify 2 keys are in correct bins.
assertEquals(10000, fileCountBySizeDao.count());
@@ -470,8 +470,8 @@ public void testProcessAtScale() {
}
omUpdateEventBatch = new OMUpdateEventBatch(omDbEventList, 0L);
- fileSizeCountTaskOBS.process(omUpdateEventBatch);
- fileSizeCountTaskFSO.process(omUpdateEventBatch);
+ fileSizeCountTaskOBS.process(omUpdateEventBatch, Collections.emptyMap());
+ fileSizeCountTaskFSO.process(omUpdateEventBatch, Collections.emptyMap());
assertEquals(10000, fileCountBySizeDao.count());
recordToFind = dslContext
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java
index e3241287ea..ea7efaddfd 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java
@@ -29,6 +29,7 @@
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Set;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -220,7 +221,7 @@ public class TestProcess {
@BeforeEach
public void setUp() throws IOException {
nSSummaryTask.reprocess(reconOMMetadataManager);
- nSSummaryTask.process(processEventBatch());
+ nSSummaryTask.process(processEventBatch(), Collections.emptyMap());
nsSummaryForBucket1 =
reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID);
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java
index eb38084b4d..83b7222405 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.recon.tasks;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getMockOzoneManagerServiceProviderWithFSO;
import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager;
@@ -27,12 +28,16 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -47,12 +52,14 @@
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
/**
* Test for NSSummaryTaskWithFSO.
@@ -118,7 +125,7 @@ public class TestNSSummaryTaskWithFSO {
void setUp(@TempDir File tmpDir) throws Exception {
ozoneConfiguration = new OzoneConfiguration();
ozoneConfiguration.setLong(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD,
- 10);
+ 3);
omMetadataManager = initializeNewOmMetadataManager(new File(tmpDir, "om"));
OzoneManagerServiceProvider ozoneManagerServiceProvider =
getMockOzoneManagerServiceProviderWithFSO();
@@ -141,9 +148,11 @@ void setUp(@TempDir File tmpDir) throws Exception {
populateOMDB();
+ long nsSummaryFlushToDBMaxThreshold = ozoneConfiguration.getLong(
+ OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, 3);
nSSummaryTaskWithFso = new NSSummaryTaskWithFSO(
reconNamespaceSummaryManager, reconOMMetadataManager,
- ozoneConfiguration);
+ ozoneConfiguration, nsSummaryFlushToDBMaxThreshold);
}
/**
@@ -313,10 +322,12 @@ public class TestProcess {
private OMDBUpdateEvent keyEvent6;
private OMDBUpdateEvent keyEvent7;
+ private Pair<Integer, Boolean> result;
+
@BeforeEach
public void setUp() throws IOException {
nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
- nSSummaryTaskWithFso.processWithFSO(processEventBatch());
+ result = nSSummaryTaskWithFso.processWithFSO(processEventBatch(), 0);
}
private OMUpdateEventBatch processEventBatch() throws IOException {
@@ -511,6 +522,97 @@ public void testParentIdAfterProcessEventBatch() throws
IOException {
"DIR_FIVE's parent ID should match BUCKET_TWO_OBJECT_ID.");
}
+ @Test
+ void testProcessWithFSOFlushAfterThresholdAndSuccess() throws IOException {
+ // Call the method under test
+
+ // Assertions
+ Assertions.assertNotNull(result, "Result should not be null");
+ // Why seekPos should be 7 ? because we have threshold value for flush
is set as 3,
+ // and we have total 7 events, so nsSummaryMap will be flushed in 2
batches and
+ // during second batch flush, eventCounter will be 6, then last event7
alone will
+ // be flushed out of loop as remaining event. At every batch flush based
on threshold,
+ // seekPos is set as equal to eventCounter + 1, so seekPos will be 7.
+ Assertions.assertEquals(7, result.getLeft(), "seekPos should be 7");
+ Assertions.assertTrue(result.getRight(), "The processing should fail due
to flush failure");
+ }
+
+ @Test
+ void testProcessWithFSOFlushAfterThresholdAndFailureOfLastElement()
+ throws NoSuchFieldException, IllegalAccessException {
+ // Assume the NamespaceSummaryTaskWithFSO object is already created
+ NSSummaryTaskWithFSO task = mock(NSSummaryTaskWithFSO.class);
+
+ // Set the value of nsSummaryFlushToDBMaxThreshold to 3 using reflection
+ Field thresholdField =
NSSummaryTaskWithFSO.class.getDeclaredField("nsSummaryFlushToDBMaxThreshold");
+ thresholdField.setAccessible(true);
+ thresholdField.set(task, 3);
+
+ ReconNamespaceSummaryManager mockReconNamespaceSummaryManager =
mock(ReconNamespaceSummaryManager.class);
+ Field managerField =
NSSummaryTaskDbEventHandler.class.getDeclaredField("reconNamespaceSummaryManager");
+ managerField.setAccessible(true);
+ managerField.set(task, mockReconNamespaceSummaryManager);
+
+ // Mock the OMUpdateEventBatch and its iterator
+ OMUpdateEventBatch events = mock(OMUpdateEventBatch.class);
+ Iterator<OMDBUpdateEvent> mockIterator = mock(Iterator.class);
+
+ Mockito.when(events.getIterator()).thenReturn(mockIterator);
+
+ // Mock OMDBUpdateEvent objects and their behavior
+ OMDBUpdateEvent<String, OmKeyInfo> event1 = mock(OMDBUpdateEvent.class);
+ OMDBUpdateEvent<String, OmKeyInfo> event2 = mock(OMDBUpdateEvent.class);
+ OMDBUpdateEvent<String, OmKeyInfo> event3 = mock(OMDBUpdateEvent.class);
+ OMDBUpdateEvent<String, OmKeyInfo> event4 = mock(OMDBUpdateEvent.class);
+
+ // Mock getAction() for each event
+
Mockito.when(event1.getAction()).thenReturn(OMDBUpdateEvent.OMDBUpdateAction.PUT);
+
Mockito.when(event2.getAction()).thenReturn(OMDBUpdateEvent.OMDBUpdateAction.PUT);
+
Mockito.when(event3.getAction()).thenReturn(OMDBUpdateEvent.OMDBUpdateAction.PUT);
+
Mockito.when(event4.getAction()).thenReturn(OMDBUpdateEvent.OMDBUpdateAction.PUT);
+
+ OmKeyInfo keyInfo1 = new
OmKeyInfo.Builder().setParentObjectID(1).setObjectID(2).setKeyName("key1")
+ .setBucketName("bucket1")
+ .setDataSize(1024).setVolumeName("volume1").build();
+ OmKeyInfo keyInfo2 = new
OmKeyInfo.Builder().setParentObjectID(1).setObjectID(3).setKeyName("key2")
+ .setBucketName("bucket1")
+ .setDataSize(1024).setVolumeName("volume1").build();
+ OmKeyInfo keyInfo3 = new
OmKeyInfo.Builder().setParentObjectID(1).setObjectID(3).setKeyName("key2")
+ .setBucketName("bucket1")
+ .setDataSize(1024).setVolumeName("volume1").build();
+ OmKeyInfo keyInfo4 = new
OmKeyInfo.Builder().setParentObjectID(1).setObjectID(3).setKeyName("key2")
+ .setBucketName("bucket1")
+ .setDataSize(1024).setVolumeName("volume1").build();
+ Mockito.when(event1.getValue()).thenReturn(keyInfo1);
+ Mockito.when(event2.getValue()).thenReturn(keyInfo2);
+ Mockito.when(event3.getValue()).thenReturn(keyInfo3);
+ Mockito.when(event4.getValue()).thenReturn(keyInfo4);
+
+ // Mock getTable() to return valid table name
+ Mockito.when(event1.getTable()).thenReturn(FILE_TABLE);
+ Mockito.when(event2.getTable()).thenReturn(FILE_TABLE);
+ Mockito.when(event3.getTable()).thenReturn(FILE_TABLE);
+ Mockito.when(event4.getTable()).thenReturn(FILE_TABLE);
+
+ // Mock iterator to return the events
+ Mockito.when(mockIterator.hasNext()).thenReturn(true, true, true, true,
false);
+ Mockito.when(mockIterator.next()).thenReturn(event1, event2, event3,
event4);
+
+ // Mock the flushAndCommitNSToDB method to fail on the last flush
+ NSSummaryTaskWithFSO taskSpy = Mockito.spy(task);
+
Mockito.doReturn(true).doReturn(true).doReturn(false).when(taskSpy).flushAndCommitNSToDB(Mockito.anyMap());
+
+ // Call the method under test
+ Pair<Integer, Boolean> result1 = taskSpy.processWithFSO(events, 0);
+
+ // Assertions
+ Assertions.assertNotNull(result1, "Result should not be null");
+ Assertions.assertEquals(0, result1.getLeft(), "seekPos should be 4");
+
+ // Verify interactions
+ Mockito.verify(mockIterator, Mockito.times(3)).next();
+ Mockito.verify(taskSpy,
Mockito.times(1)).flushAndCommitNSToDB(Mockito.anyMap());
+ }
}
/**
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacy.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacy.java
index ceb88a3656..4d1f58e671 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacy.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacy.java
@@ -23,6 +23,7 @@
import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDirToOm;
import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeKeyToOm;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -36,8 +37,8 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
-import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmConfig;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -142,9 +143,11 @@ void setUp(@TempDir File tmpDir) throws Exception {
populateOMDB();
+ long nsSummaryFlushToDBMaxThreshold = omConfiguration.getLong(
+ OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, 10);
nSSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy(
reconNamespaceSummaryManager,
- reconOMMetadataManager, omConfiguration);
+ reconOMMetadataManager, omConfiguration,
nsSummaryFlushToDBMaxThreshold);
}
/**
@@ -292,7 +295,7 @@ public class TestProcess {
@BeforeEach
public void setUp() throws IOException {
nSSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager);
- nSSummaryTaskWithLegacy.processWithLegacy(processEventBatch());
+ nSSummaryTaskWithLegacy.processWithLegacy(processEventBatch(), 0);
nsSummaryForBucket1 =
reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID);
@@ -689,8 +692,8 @@ private void initializeNewOmMetadataManager(
omConfiguration = new OzoneConfiguration();
omConfiguration.set(OZONE_OM_DB_DIRS,
omDbDir.getAbsolutePath());
- omConfiguration.set(OMConfigKeys
- .OZONE_OM_ENABLE_FILESYSTEM_PATHS, "true");
+ omConfiguration.set(OmConfig.Keys.ENABLE_FILESYSTEM_PATHS, "true");
+ omConfiguration.set(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, "10");
omMetadataManager = new OmMetadataManagerImpl(
omConfiguration, null);
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacyOBSLayout.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacyOBSLayout.java
index 766478b871..48054d1eed 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacyOBSLayout.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacyOBSLayout.java
@@ -22,6 +22,7 @@
import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getMockOzoneManagerServiceProviderWithFSO;
import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeKeyToOm;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -115,6 +116,8 @@ void setUp(@TempDir File tmpDir) throws Exception {
ozoneConfiguration = new OzoneConfiguration();
ozoneConfiguration.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS,
false);
+ ozoneConfiguration.setLong(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD,
+ 10);
ReconTestInjector reconTestInjector =
new ReconTestInjector.Builder(tmpDir)
@@ -132,9 +135,12 @@ void setUp(@TempDir File tmpDir) throws Exception {
populateOMDB();
+ long nsSummaryFlushToDBMaxThreshold = ozoneConfiguration.getLong(
+ OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, 10);
nSSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy(
reconNamespaceSummaryManager,
- reconOMMetadataManager, ozoneConfiguration);
+ reconOMMetadataManager, ozoneConfiguration,
+ nsSummaryFlushToDBMaxThreshold);
}
/**
@@ -240,7 +246,7 @@ public void setUp() throws IOException {
// reinit Recon RocksDB's namespace CF.
reconNamespaceSummaryManager.clearNSSummaryTable();
nSSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager);
- nSSummaryTaskWithLegacy.processWithLegacy(processEventBatch());
+ nSSummaryTaskWithLegacy.processWithLegacy(processEventBatch(), 0);
nsSummaryForBucket1 =
reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID);
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithOBS.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithOBS.java
index 0db53aee12..386a5539f1 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithOBS.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithOBS.java
@@ -22,6 +22,8 @@
import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getMockOzoneManagerServiceProviderWithFSO;
import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeKeyToOm;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -34,8 +36,8 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
-import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmConfig;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -126,9 +128,13 @@ void setUp(@TempDir File tmpDir) throws Exception {
populateOMDB();
+ long nsSummaryFlushToDBMaxThreshold = omConfiguration.getLong(
+ OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD,
+ OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT);
nSSummaryTaskWithOBS = new NSSummaryTaskWithOBS(
reconNamespaceSummaryManager,
- reconOMMetadataManager, omConfiguration);
+ reconOMMetadataManager, omConfiguration,
+ nsSummaryFlushToDBMaxThreshold);
}
/**
@@ -234,7 +240,7 @@ public void setUp() throws IOException {
// reinit Recon RocksDB's namespace CF.
reconNamespaceSummaryManager.clearNSSummaryTable();
nSSummaryTaskWithOBS.reprocessWithOBS(reconOMMetadataManager);
- nSSummaryTaskWithOBS.processWithOBS(processEventBatch());
+ nSSummaryTaskWithOBS.processWithOBS(processEventBatch(), 0);
nsSummaryForBucket1 =
reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID);
@@ -458,8 +464,8 @@ private void initializeNewOmMetadataManager(
omConfiguration = new OzoneConfiguration();
omConfiguration.set(OZONE_OM_DB_DIRS,
omDbDir.getAbsolutePath());
- omConfiguration.set(OMConfigKeys
- .OZONE_OM_ENABLE_FILESYSTEM_PATHS, "true");
+ omConfiguration.set(OmConfig.Keys.ENABLE_FILESYSTEM_PATHS, "true");
+ omConfiguration.set(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, "10");
omMetadataManager = new OmMetadataManagerImpl(
omConfiguration, null);
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
index 040975a5ef..dc8d34f335 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
@@ -45,9 +45,9 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -155,7 +155,7 @@ private void initializeInjector() throws IOException {
globalStatsDao, getConfiguration(), reconOMMetadataManager);
nSSummaryTaskWithFso = new NSSummaryTaskWithFSO(
reconNamespaceSummaryManager, reconOMMetadataManager,
- ozoneConfiguration);
+ ozoneConfiguration, 10);
dslContext = getDslContext();
omTableInsightTask.setTables(omTableInsightTask.getTaskTables());
@@ -289,9 +289,9 @@ public void testReprocessForDeletedDirectory() throws
Exception {
// Generate NamespaceSummary for the OM DB
nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager);
- Pair<String, Boolean> result =
+ ReconOmTask.TaskResult result =
omTableInsightTask.reprocess(reconOMMetadataManager);
- assertTrue(result.getRight());
+ assertTrue(result.isTaskSuccess());
assertEquals(3, getCountForTable(DELETED_DIR_TABLE));
}
@@ -329,7 +329,7 @@ public void testProcessForDeletedDirectoryTable() throws
IOException {
DELETED_DIR_TABLE, PUT, null));
}
OMUpdateEventBatch putEventBatch = new OMUpdateEventBatch(putEvents, 0L);
- omTableInsightTask.process(putEventBatch);
+ omTableInsightTask.process(putEventBatch, Collections.emptyMap());
assertEquals(5, getCountForTable(DELETED_DIR_TABLE));
@@ -343,7 +343,7 @@ public void testProcessForDeletedDirectoryTable() throws
IOException {
getOmKeyInfo("vol1", "bucket1", DIR_ONE, 3L, false), DELETED_DIR_TABLE,
DELETE, null));
OMUpdateEventBatch deleteEventBatch = new OMUpdateEventBatch(deleteEvents,
0L);
- omTableInsightTask.process(deleteEventBatch);
+ omTableInsightTask.process(deleteEventBatch, Collections.emptyMap());
assertEquals(3, getCountForTable(DELETED_DIR_TABLE));
}
@@ -376,10 +376,10 @@ public void testReprocessForCount() throws Exception {
when(mockIter.next()).thenReturn(mockKeyValue);
}
- Pair<String, Boolean> result =
+ ReconOmTask.TaskResult result =
omTableInsightTask.reprocess(omMetadataManager);
- assertTrue(result.getRight());
+ assertTrue(result.isTaskSuccess());
assertEquals(5L, getCountForTable(KEY_TABLE));
assertEquals(5L, getCountForTable(VOLUME_TABLE));
assertEquals(5L, getCountForTable(BUCKET_TABLE));
@@ -397,9 +397,9 @@ public void testReprocessForOpenKeyTable() throws Exception
{
writeOpenKeyToOm(reconOMMetadataManager,
"key1", "Bucket3", "Volume3", null, 3L);
- Pair<String, Boolean> result =
+ ReconOmTask.TaskResult result =
omTableInsightTask.reprocess(reconOMMetadataManager);
- assertTrue(result.getRight());
+ assertTrue(result.isTaskSuccess());
assertEquals(3L, getCountForTable(OPEN_KEY_TABLE));
// Test for both replicated and unreplicated size for OPEN_KEY_TABLE
assertEquals(6L, getUnReplicatedSizeForTable(OPEN_KEY_TABLE));
@@ -416,9 +416,9 @@ public void testReprocessForOpenFileTable() throws
Exception {
writeOpenFileToOm(reconOMMetadataManager,
"file3", "Bucket3", "Volume3", "file3", 3, 0, 3, 3, null, 3L);
- Pair<String, Boolean> result =
+ ReconOmTask.TaskResult result =
omTableInsightTask.reprocess(reconOMMetadataManager);
- assertTrue(result.getRight());
+ assertTrue(result.isTaskSuccess());
assertEquals(3L, getCountForTable(OPEN_FILE_TABLE));
// Test for both replicated and unreplicated size for OPEN_FILE_TABLE
assertEquals(6L, getUnReplicatedSizeForTable(OPEN_FILE_TABLE));
@@ -440,9 +440,9 @@ public void testReprocessForDeletedTable() throws Exception
{
deletedKeysList3, "Bucket3", "Volume3");
- Pair<String, Boolean> result =
+ ReconOmTask.TaskResult result =
omTableInsightTask.reprocess(reconOMMetadataManager);
- assertTrue(result.getRight());
+ assertTrue(result.isTaskSuccess());
assertEquals(6L, getCountForTable(DELETED_TABLE));
// Test for both replicated and unreplicated size for DELETED_TABLE
assertEquals(600L, getUnReplicatedSizeForTable(DELETED_TABLE));
@@ -479,7 +479,7 @@ public void testProcessForCount() {
// Processing the initial batch of events
OMUpdateEventBatch initialBatch = new OMUpdateEventBatch(initialEvents,
0L);
- omTableInsightTask.process(initialBatch);
+ omTableInsightTask.process(initialBatch, Collections.emptyMap());
// Verifying the count in each table
for (String tableName : omTableInsightTask.getTaskTables()) {
@@ -508,7 +508,7 @@ public void testProcessForCount() {
// Processing the additional events
OMUpdateEventBatch additionalBatch =
new OMUpdateEventBatch(additionalEvents, 0L);
- omTableInsightTask.process(additionalBatch);
+ omTableInsightTask.process(additionalBatch, Collections.emptyMap());
// Verifying the final count in each table
for (String tableName : omTableInsightTask.getTaskTables()) {
if (tableName.equals(DELETED_TABLE)) {
@@ -537,7 +537,7 @@ public void testProcessForOpenKeyTableAndOpenFileTable() {
}
OMUpdateEventBatch putEventBatch = new OMUpdateEventBatch(putEvents, 0L);
- omTableInsightTask.process(putEventBatch);
+ omTableInsightTask.process(putEventBatch, Collections.emptyMap());
// After 5 PUTs, size should be 5 * 1000 = 5000
for (String tableName : new ArrayList<>(
@@ -555,7 +555,7 @@ public void testProcessForOpenKeyTableAndOpenFileTable() {
getOMUpdateEvent("item0", omKeyInfo, OPEN_FILE_TABLE, DELETE, null));
OMUpdateEventBatch deleteEventBatch = new OMUpdateEventBatch(deleteEvents,
0L);
- omTableInsightTask.process(deleteEventBatch);
+ omTableInsightTask.process(deleteEventBatch, Collections.emptyMap());
// After deleting "item0", size should be 4 * 1000 = 4000
for (String tableName : new ArrayList<>(
@@ -578,7 +578,7 @@ public void testProcessForOpenKeyTableAndOpenFileTable() {
}
OMUpdateEventBatch updateEventBatch = new OMUpdateEventBatch(updateEvents,
0L);
- omTableInsightTask.process(updateEventBatch);
+ omTableInsightTask.process(updateEventBatch, Collections.emptyMap());
// After updating "item1", size should be 4000 - 1000 + 2000 = 5000
// presentValue - oldValue + newValue = updatedValue
@@ -615,7 +615,7 @@ public void testProcessForDeletedTable() {
null));
}
OMUpdateEventBatch putEventBatch = new OMUpdateEventBatch(putEvents, 0L);
- omTableInsightTask.process(putEventBatch);
+ omTableInsightTask.process(putEventBatch, Collections.emptyMap());
// Each of the 5 RepeatedOmKeyInfo object has 5 OmKeyInfo obj,
// so total deleted keys should be 5 * 5 = 25
assertEquals(25L, getCountForTable(DELETED_TABLE));
@@ -631,7 +631,7 @@ public void testProcessForDeletedTable() {
getOMUpdateEvent("item0", repeatedOmKeyInfo, DELETED_TABLE, DELETE,
null));
OMUpdateEventBatch deleteEventBatch = new OMUpdateEventBatch(deleteEvents,
0L);
- omTableInsightTask.process(deleteEventBatch);
+ omTableInsightTask.process(deleteEventBatch, Collections.emptyMap());
// After deleting "item0" total deleted keys should be 20
assertEquals(20L, getCountForTable(DELETED_TABLE));
// After deleting "item0", size should be 4 * 1000 = 4000
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
index 384da46aa1..ad7eafd160 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
@@ -21,6 +21,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -28,7 +29,6 @@
import static org.mockito.Mockito.when;
import java.util.HashSet;
-import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
@@ -81,8 +81,8 @@ public void testRegisterTask() {
@Test
public void testConsumeOMEvents() throws Exception {
ReconOmTask reconOmTaskMock = getMockTask("MockTask");
- when(reconOmTaskMock.process(any(OMUpdateEventBatch.class)))
- .thenReturn(new ImmutablePair<>("MockTask", true));
+ when(reconOmTaskMock.process(any(OMUpdateEventBatch.class), anyMap()))
+ .thenReturn(new
ReconOmTask.TaskResult.Builder().setTaskName("MockTask").setTaskSuccess(true).build());
reconTaskController.registerTask(reconOmTaskMock);
OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
@@ -94,7 +94,7 @@ public void testConsumeOMEvents() throws Exception {
mock(OMMetadataManager.class));
verify(reconOmTaskMock, times(1))
- .process(any());
+ .process(any(), anyMap());
long endTime = System.currentTimeMillis();
ReconTaskStatus reconTaskStatus = reconTaskStatusDao.findById("MockTask");
@@ -111,7 +111,7 @@ public void testTaskRecordsFailureOnException() throws
Exception {
OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
// Throw exception when trying to run task
- when(reconOmTaskMock.process(any(OMUpdateEventBatch.class)))
+ when(reconOmTaskMock.process(any(OMUpdateEventBatch.class), anyMap()))
.thenThrow(new RuntimeException("Mock Failure"));
reconTaskController.registerTask(reconOmTaskMock);
when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
@@ -123,7 +123,7 @@ public void testTaskRecordsFailureOnException() throws
Exception {
mock(OMMetadataManager.class));
verify(reconOmTaskMock, times(1))
- .process(any());
+ .process(any(), anyMap());
long endTime = System.currentTimeMillis();
ReconTaskStatus reconTaskStatus = reconTaskStatusDao.findById("MockTask");
@@ -209,7 +209,7 @@ public void testReInitializeTasks() throws Exception {
ReconOmTask reconOmTaskMock =
getMockTask("MockTask2");
when(reconOmTaskMock.reprocess(omMetadataManagerMock))
- .thenReturn(new ImmutablePair<>("MockTask2", true));
+ .thenReturn(new
ReconOmTask.TaskResult.Builder().setTaskName("MockTask2").setTaskSuccess(true).build());
when(omMetadataManagerMock.getLastSequenceNumberFromDB()
).thenReturn(100L);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]