devmadhuu commented on code in PR #9243:
URL: https://github.com/apache/ozone/pull/9243#discussion_r2488958689


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.rocksdb.LiveFileMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to iterate through a table in parallel by breaking table into 
multiple iterators.
+ */
+public class ParallelTableIteratorOperation<K extends Comparable<K>, V> 
implements Closeable {

Review Comment:
   This class can be moved to ozone-common module as it can be used for 
iterating other db tables also in future and class does not contain any 
specific logic related to recon rocks DB tables.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -702,145 +702,153 @@ public boolean syncDataFromOM() {
       try {
         long currentSequenceNumber = getCurrentOMDBSequenceNumber();
         LOG.info("Seq number of Recon's OM DB : {}", currentSequenceNumber);
-        boolean fullSnapshot = false;
+        boolean fullSnapshot = true;
+
 
-        if (currentSequenceNumber <= 0) {
-          fullSnapshot = true;
+        if (reInitializeTasksCalled.compareAndSet(false, true)) {
+          LOG.info("Calling reprocess on Recon tasks.");
+          reconTaskController.reInitializeTasks(omMetadataManager, null);
         } else {
-          // Get updates from OM and apply to local Recon OM DB and update 
task status in table
-          deltaReconTaskStatusUpdater.recordRunStart();
-          int loopCount = 0;
-          long fromSequenceNumber = currentSequenceNumber;
-          long diffBetweenOMDbAndReconDBSeqNumber = deltaUpdateLimit + 1;
-          /**
-           * This loop will continue to fetch and apply OM DB updates and with 
every
-           * OM DB fetch request, it will fetch {@code deltaUpdateLimit} count 
of DB updates.
-           * It continues to fetch from OM till the lag, between OM DB WAL 
sequence number
-           * and Recon OM DB snapshot WAL sequence number, is less than this 
lag threshold value.
-           * In high OM write TPS cluster, this simulates continuous pull from 
OM without any delay.
-           */
-          while (diffBetweenOMDbAndReconDBSeqNumber > omDBLagThreshold) {
-            try (OMDBUpdatesHandler omdbUpdatesHandler =
-                     new OMDBUpdatesHandler(omMetadataManager)) {
-
-              // If interrupt was previously signalled,
-              // we should check for it before starting delta update sync.
-              if (Thread.currentThread().isInterrupted()) {
-                throw new InterruptedException("Thread interrupted during 
delta update.");
-              }
-              diffBetweenOMDbAndReconDBSeqNumber =
-                  getAndApplyDeltaUpdatesFromOM(currentSequenceNumber, 
omdbUpdatesHandler);
-              deltaReconTaskStatusUpdater.setLastTaskRunStatus(0);
-              // Keeping last updated sequence number for both full and delta 
tasks to be same
-              // because sequence number of DB denotes and points to same OM 
DB copy of Recon,
-              // even though two different tasks are updating the DB at 
different conditions, but
-              // it tells the sync state with actual OM DB for the same Recon 
OM DB copy.
-              
deltaReconTaskStatusUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
-              
fullSnapshotReconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
-              deltaReconTaskStatusUpdater.recordRunCompletion();
-              fullSnapshotReconTaskUpdater.updateDetails();
-              // Update the current OM metadata manager in task controller
-              reconTaskController.updateOMMetadataManager(omMetadataManager);
-              
-              // Pass on DB update events to tasks that are listening.
-              reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
-                  omdbUpdatesHandler.getEvents(), 
omdbUpdatesHandler.getLatestSequenceNumber()), omMetadataManager);
-              
-              // Check if task reinitialization is needed due to buffer 
overflow or task failures
-              boolean bufferOverflowed = 
reconTaskController.hasEventBufferOverflowed();
-              boolean tasksFailed = reconTaskController.hasTasksFailed();
-
-              if (bufferOverflowed || tasksFailed) {
-                ReconTaskReInitializationEvent.ReInitializationReason reason = 
bufferOverflowed ?
-                    
ReconTaskReInitializationEvent.ReInitializationReason.BUFFER_OVERFLOW :
-                    
ReconTaskReInitializationEvent.ReInitializationReason.TASK_FAILURES;
-
-                LOG.warn("Detected condition for task reinitialization: {}, 
queueing async reinitialization event",
-                    reason);
-
-                markDeltaTaskStatusAsFailed(deltaReconTaskStatusUpdater);
-
-                // Queue async reinitialization event - checkpoint creation 
and retry logic is handled internally
-                ReconTaskController.ReInitializationResult result =
-                    reconTaskController.queueReInitializationEvent(reason);
-
-                //TODO: Create a metric to track this event buffer overflow or 
task failure event
-                boolean triggerFullSnapshot =
-                    Optional.ofNullable(result)
-                        .map(r -> {
-                          switch (r) {
-                          case MAX_RETRIES_EXCEEDED:
-                            LOG.warn(
-                                "Reinitialization queue failures exceeded 
maximum retries, triggering full snapshot " +
-                                    "fallback");
-                            return true;
-
-                          case RETRY_LATER:
-                            LOG.debug("Reinitialization event queueing will be 
retried in next iteration");
-                            return false;
-
-                          default:
-                            LOG.info("Reinitialization event successfully 
queued");
-                            return false;
-                          }
-                        })
-                        .orElseGet(() -> {
-                          LOG.error(
-                              "ReInitializationResult is null, something went 
wrong in queueing reinitialization " +
-                                  "event");
-                          return true;
-                        });
-
-                if (triggerFullSnapshot) {
-                  fullSnapshot = true;
-                }
-              }
-              currentSequenceNumber = getCurrentOMDBSequenceNumber();
-              LOG.debug("Updated current sequence number: {}", 
currentSequenceNumber);
-              loopCount++;
-            } catch (InterruptedException intEx) {
-              LOG.error("OM DB Delta update sync thread was interrupted and 
delta sync failed.");
-              // We are updating the table even if it didn't run i.e. got 
interrupted beforehand
-              // to indicate that a task was supposed to run, but it didn't.
-              markDeltaTaskStatusAsFailed(deltaReconTaskStatusUpdater);
-              Thread.currentThread().interrupt();
-              // Since thread is interrupted, we do not fall back to snapshot 
sync.
-              // Return with sync failed status.
-              return false;
-            } catch (Exception e) {
-              markDeltaTaskStatusAsFailed(deltaReconTaskStatusUpdater);
-              LOG.warn("Unable to get and apply delta updates from OM: {}, 
falling back to full snapshot",
-                  e.getMessage());
-              fullSnapshot = true;
-            }
-            if (fullSnapshot) {
-              break;
-            }
-          }
-          LOG.info("Delta updates received from OM : {} loops, {} records", 
loopCount,
-              getCurrentOMDBSequenceNumber() - fromSequenceNumber);
+          LOG.info("reInitializeTasks already called once; skipping.");
         }
 
-        if (fullSnapshot) {
-          try {
-            executeFullSnapshot(fullSnapshotReconTaskUpdater, 
deltaReconTaskStatusUpdater);
-          } catch (InterruptedException intEx) {
-            LOG.error("OM DB Snapshot update sync thread was interrupted.");
-            fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
-            fullSnapshotReconTaskUpdater.recordRunCompletion();
-            Thread.currentThread().interrupt();
-            // Mark sync status as failed.
-            return false;
-          } catch (Exception e) {
-            metrics.incrNumSnapshotRequestsFailed();
-            fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
-            fullSnapshotReconTaskUpdater.recordRunCompletion();
-            LOG.error("Unable to update Recon's metadata with new OM DB. ", e);
-            // Update health status in ReconContext
-            reconContext.updateHealthStatus(new AtomicBoolean(false));
-            
reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
-          }
-        }
+//        if (currentSequenceNumber <= 0) {

Review Comment:
   Why all this code is commented ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to