sumitagrawl commented on code in PR #9243:
URL: https://github.com/apache/ozone/pull/9243#discussion_r2588382717


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DeletedKeysInsightHandler.java:
##########
@@ -112,14 +113,14 @@ public void handleUpdateEvent(OMDBUpdateEvent<String, 
Object> event,
    * pending deletion in Ozone.
    */
   @Override
-  public Triple<Long, Long, Long> getTableSizeAndCount(
-      TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator)
-      throws IOException {
+  public Triple<Long, Long, Long> getTableSizeAndCount(String tableName,
+      OMMetadataManager omMetadataManager) throws IOException {
     long count = 0;
     long unReplicatedSize = 0;
     long replicatedSize = 0;
 
-    if (iterator != null) {
+    Table<String, ?> table = (Table<String, ?>) 
omMetadataManager.getTable(tableName);

Review Comment:
   This need not be generic, can use RepeatedOmKeyInfo instead of "?"



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OpenKeysInsightHandler.java:
##########
@@ -127,14 +128,14 @@ public void handleUpdateEvent(OMDBUpdateEvent<String, 
Object> event,
    * that are currently open in the backend.
    */
   @Override
-  public Triple<Long, Long, Long> getTableSizeAndCount(
-      TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator)
-      throws IOException {
+  public Triple<Long, Long, Long> getTableSizeAndCount(String tableName,
+      OMMetadataManager omMetadataManager) throws IOException {
     long count = 0;
     long unReplicatedSize = 0;
     long replicatedSize = 0;
 
-    if (iterator != null) {
+    Table<String, ?> table = (Table<String, ?>) 
omMetadataManager.getTable(tableName);

Review Comment:
   can use OmKeyInfo as value instead of generic



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java:
##########
@@ -51,31 +59,63 @@ public abstract class ContainerKeyMapperHelper {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ContainerKeyMapperHelper.class);
 
-  // Static lock to guard table truncation.
-  private static final Object TRUNCATE_LOCK = new Object();
+  // Single lock to guard all initialization operations (table truncation + 
map clearing)
+  private static final Object INITIALIZATION_LOCK = new Object();
 
   /**
-   * Ensures that the container key tables are truncated only once before 
reprocessing.
-   * Uses an AtomicBoolean to track if truncation has already been performed.
-   *
-   * @param reconContainerMetadataManager The metadata manager instance 
responsible for DB operations.
+   * Reference counter to track how many tasks are actively using the shared 
map.
+   * Initialized to 2 (FSO + OBS tasks) during initialization.
+   * Each task decrements on completion. Last task (count reaches 0) clears 
the shared map.
+   */
+  private static final AtomicInteger ACTIVE_TASK_COUNT = new AtomicInteger(0);
+
+  /**
+   * SHARED across all tasks (FSO + OBS) for cross-task synchronization.
+   * Maps: ContainerId -> AtomicLong (key count in that container)
+   * Purpose: Prevents data corruption when FSO and OBS tasks run concurrently
+   * and both write to the same container IDs. Both tasks accumulate into this
+   * single shared map, ensuring final DB write contains complete totals.
+   */
+  private static final Map<Long, AtomicLong> SHARED_CONTAINER_KEY_COUNT_MAP = 
new ConcurrentHashMap<>();
+
+  /**
+   * Performs one-time initialization for Container Key Mapper tasks.
+   * This includes:
+   * 1. Truncating container key tables in DB
+   * 2. Clearing the shared container count map
+   * 
+   * This method is called by both FSO and OBS tasks at the start of reprocess.
+   * Only the first task to call this will perform initialization.
+   * 
+   * @param reconContainerMetadataManager The metadata manager for DB 
operations
+   * @param taskName Name of the task calling this method (for logging)
+   * @throws RuntimeException if initialization fails
    */
-  public static void truncateTablesIfNeeded(ReconContainerMetadataManager 
reconContainerMetadataManager,
-                                            String taskName) {
-    synchronized (TRUNCATE_LOCK) {
-      if (ReconConstants.CONTAINER_KEY_TABLES_TRUNCATED.compareAndSet(false, 
true)) {
+  private static void initializeContainerKeyMapperIfNeeded(
+      ReconContainerMetadataManager reconContainerMetadataManager,
+      String taskName) {
+    
+    synchronized (INITIALIZATION_LOCK) {
+      // Check if already initialized by another task
+      if (ReconConstants.CONTAINER_KEY_MAPPER_INITIALIZED.compareAndSet(false, 
true)) {
         try {
-          // Perform table truncation
+          // Step 1: Truncate tables
           
reconContainerMetadataManager.reinitWithNewContainerDataFromOm(Collections.emptyMap());
-          LOG.debug("Successfully truncated container key tables.");
+          
+          // Step 2: Clear shared map
+          SHARED_CONTAINER_KEY_COUNT_MAP.clear();
+          
+          // Step 3: Increment the refrence counter for active tasks
+          ACTIVE_TASK_COUNT.incrementAndGet();

Review Comment:
   This needs to be done inside LOCK, but before 
CONTAINER_KEY_MAPPER_INITIALIZED as to be done for every call



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java:
##########
@@ -51,31 +59,63 @@ public abstract class ContainerKeyMapperHelper {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ContainerKeyMapperHelper.class);
 
-  // Static lock to guard table truncation.
-  private static final Object TRUNCATE_LOCK = new Object();
+  // Single lock to guard all initialization operations (table truncation + 
map clearing)
+  private static final Object INITIALIZATION_LOCK = new Object();
 
   /**
-   * Ensures that the container key tables are truncated only once before 
reprocessing.
-   * Uses an AtomicBoolean to track if truncation has already been performed.
-   *
-   * @param reconContainerMetadataManager The metadata manager instance 
responsible for DB operations.
+   * Reference counter to track how many tasks are actively using the shared 
map.
+   * Initialized to 2 (FSO + OBS tasks) during initialization.
+   * Each task decrements on completion. Last task (count reaches 0) clears 
the shared map.
+   */
+  private static final AtomicInteger ACTIVE_TASK_COUNT = new AtomicInteger(0);
+
+  /**
+   * SHARED across all tasks (FSO + OBS) for cross-task synchronization.
+   * Maps: ContainerId -> AtomicLong (key count in that container)
+   * Purpose: Prevents data corruption when FSO and OBS tasks run concurrently
+   * and both write to the same container IDs. Both tasks accumulate into this
+   * single shared map, ensuring final DB write contains complete totals.
+   */
+  private static final Map<Long, AtomicLong> SHARED_CONTAINER_KEY_COUNT_MAP = 
new ConcurrentHashMap<>();
+
+  /**
+   * Performs one-time initialization for Container Key Mapper tasks.
+   * This includes:
+   * 1. Truncating container key tables in DB
+   * 2. Clearing the shared container count map
+   * 
+   * This method is called by both FSO and OBS tasks at the start of reprocess.
+   * Only the first task to call this will perform initialization.
+   * 
+   * @param reconContainerMetadataManager The metadata manager for DB 
operations
+   * @param taskName Name of the task calling this method (for logging)
+   * @throws RuntimeException if initialization fails
    */
-  public static void truncateTablesIfNeeded(ReconContainerMetadataManager 
reconContainerMetadataManager,
-                                            String taskName) {
-    synchronized (TRUNCATE_LOCK) {
-      if (ReconConstants.CONTAINER_KEY_TABLES_TRUNCATED.compareAndSet(false, 
true)) {
+  private static void initializeContainerKeyMapperIfNeeded(
+      ReconContainerMetadataManager reconContainerMetadataManager,
+      String taskName) {
+    
+    synchronized (INITIALIZATION_LOCK) {
+      // Check if already initialized by another task
+      if (ReconConstants.CONTAINER_KEY_MAPPER_INITIALIZED.compareAndSet(false, 
true)) {
         try {
-          // Perform table truncation
+          // Step 1: Truncate tables
           
reconContainerMetadataManager.reinitWithNewContainerDataFromOm(Collections.emptyMap());
-          LOG.debug("Successfully truncated container key tables.");
+          
+          // Step 2: Clear shared map
+          SHARED_CONTAINER_KEY_COUNT_MAP.clear();
+          
+          // Step 3: Increment the refrence counter for active tasks
+          ACTIVE_TASK_COUNT.incrementAndGet();
+
         } catch (Exception e) {
-          // Reset the flag so truncation can be retried
-          ReconConstants.CONTAINER_KEY_TABLES_TRUNCATED.set(false);
-          LOG.error("Error while truncating container key tables for task {}. 
Resetting flag.", taskName, e);
-          throw new RuntimeException("Table truncation failed", e);
+          // CRITICAL: Reset flag so another task can retry
+          ReconConstants.CONTAINER_KEY_MAPPER_INITIALIZED.set(false);
+          LOG.error("{}: Container Key Mapper initialization failed. Resetting 
flag for retry.", taskName, e);

Review Comment:
   Need decrement active count on exception with above change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to