sumitagrawl commented on code in PR #8098:
URL: https://github.com/apache/ozone/pull/8098#discussion_r1997934324


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -269,36 +270,145 @@ public void start() {
         .values()
         .forEach(ReconOmTask::init);
 
-    // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber number not 
matching with
+    // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber is greater than 
zero and greater than
+    // 'OmSnapshotRequest' task's lastUpdatedSeqNumber number and not matching 
with
     // lastUpdatedSeqNumber number for any of the OM task, then just run 
reprocess for such tasks.
+
+    ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater =
+        
taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmSnapshotRequest.name());
     ReconTaskStatusUpdater deltaTaskStatusUpdater =
         
taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmDeltaRequest.name());
 
-    Map<String, ReconOmTask> reconOmTaskMap = 
reconTaskController.getRegisteredTasks()
-        .entrySet()
-        .stream()
-        .filter(entry -> {
-          String taskName = entry.getKey();
-          ReconTaskStatusUpdater taskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
-
-          return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) && 
 // Condition 1
-              !taskStatusUpdater.getLastUpdatedSeqNumber()
-                  .equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber());  
// Condition 2
-        })
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));  
// Collect into desired Map
+    Map<String, ReconOmTask> reconOmTaskMap =
+        verifyAndGetReconOmTaskMapForReprocess(fullSnapshotTaskStatusUpdater, 
deltaTaskStatusUpdater);
+    reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap);
+    startSyncDataFromOM(initialDelay);
+  }
+
+  /**
+   * This method validates the following cases for OM tasks to be reprocessed 
during Recon bootstrap process.
+   *
+   * Case 1: Normal bootstrap flow will take care of this scenario.
+   * full snapshot: DB not Updated
+   *   - Om Snapshot number - 0
+   *   - Om Delta snapshot number - 0
+   *   - All Om Tasks snapshot number - 0
+   *
+   * Case 2: This case will force Recon to run reprocess of only those OM 
tasks whose
+   *         last updated sequence number is zero.
+   * full snapshot: DB Updated, Tasks not reprocessed, Recon restarted or crash
+   *   - Om Snapshot number - 100000
+   *   - Om Delta snapshot number - 0
+   *   - Few Om Tasks snapshot number - 0, remaining Om tasks snapshot number 
- 100000
+   *
+   * Case 3: This case will force Recon to run reprocess of all OM tasks.
+   * full snapshot: DB Updated, Tasks not reprocessed, Recon restarted or crash
+   *   - Om Snapshot number - 100000
+   *   - Om Delta snapshot number - 0
+   *   - All Om Tasks snapshot number - 0
+   *
+   * Case 4: This case will not force to reprocess any OM tasks and on restart 
of Recon,
+   *         bootstrap normal flow will be okay.
+   * full snapshot: DB Updated, Tasks reprocessed, but before delta DB 
applied, Recon restarted or crash
+   *   - Om Snapshot number - 100000
+   *   - Om Delta snapshot number - 0
+   *   - All Om Tasks snapshot number - 100000
+   *
+   * Case 5: This case will force Recon to run reprocess of all OM tasks.
+   * full snapshot: DB Updated, Tasks reprocessed, delta DB updates also 
applied, recon restarted or crash,
+   *                but all delta tasks not processed.
+   *   - Om Snapshot number - 100000
+   *   - Om Delta snapshot number - 100010
+   *   - All Om Tasks snapshot number - 100000
+   *
+   * Case 6: This case will force Recon to run reprocess of only those OM 
tasks whose
+   *         last updated sequence number is less than Om Delta snapshot 
number.
+   * full snapshot: DB Updated, Tasks reprocessed, delta DB updates also 
applied, recon restarted or crash,
+   *                but delta tasks not processed.
+   *   - Om Snapshot number - 100000
+   *   - Om Delta snapshot number - 100010
+   *   - Few Om Tasks snapshot number - 100000  , Remaining Om Tasks snapshot 
number - 100010
+   *
+   * @param fullSnapshotTaskStatusUpdater
+   * @param deltaTaskStatusUpdater
+   * @return reconOmTaskMap
+   */
+  private Map<String, ReconOmTask> verifyAndGetReconOmTaskMapForReprocess(
+      ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater,
+      ReconTaskStatusUpdater deltaTaskStatusUpdater) {
+
+    Map<String, ReconOmTask> reconOmTaskMap = new HashMap<>();
+
+    for (Map.Entry<String, ReconOmTask> entry : 
reconTaskController.getRegisteredTasks().entrySet()) {
+      String taskName = entry.getKey();
+      ReconOmTask task = entry.getValue();
+      ReconTaskStatusUpdater taskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
+
+      if (taskStatusUpdater == null) {
+        continue; // Handle null cases
+      }
+
+      boolean isBootstrapTask =
+          shouldProcessTaskForBootstrap(fullSnapshotTaskStatusUpdater, 
deltaTaskStatusUpdater, taskName,
+              taskStatusUpdater);
+      boolean isDeltaTask =
+          shouldProcessTaskForDelta(fullSnapshotTaskStatusUpdater, 
deltaTaskStatusUpdater, taskName, taskStatusUpdater);
+
+      if (isBootstrapTask || isDeltaTask) {
+        if (reconOmTaskMap.containsKey(taskName)) {
+          LOG.warn("Duplicate task detected in reconOmTaskMap: {}", taskName);
+        }
+        reconOmTaskMap.put(taskName, task);
+      }
+    }
+
     if (!reconOmTaskMap.isEmpty()) {
-      LOG.info("Task name and last updated sequence number of tasks, that are 
not matching with " +
-          "the last updated sequence number of OmDeltaRequest task:\n");
+      LOG.info("Tasks with mismatched last updated sequence numbers:");
       LOG.info("{} -> {}", deltaTaskStatusUpdater.getTaskName(), 
deltaTaskStatusUpdater.getLastUpdatedSeqNumber());
-      reconOmTaskMap.keySet()
-          .forEach(taskName -> {
-            LOG.info("{} -> {}", taskName,
-                
taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber());
 
-          });
+      reconOmTaskMap.forEach((taskName, task) -> {
+        long lastUpdatedSeqNum = 
taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber();
+        LOG.info("{} -> {}", taskName, lastUpdatedSeqNum);
+      });
     }
-    reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap);
-    startSyncDataFromOM(initialDelay);
+
+    return reconOmTaskMap;
+  }
+
+  /**
+   * Determines if a task should be processed under the "bootstrap" condition.
+   */
+  private boolean shouldProcessTaskForBootstrap(
+      ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater,
+      ReconTaskStatusUpdater deltaTaskStatusUpdater,
+      String taskName,
+      ReconTaskStatusUpdater taskStatusUpdater) {
+    return fullSnapshotTaskStatusUpdater.getLastUpdatedSeqNumber() > 0
+        && deltaTaskStatusUpdater.getLastUpdatedSeqNumber() == 0
+        && !isOmSnapshotTask(taskName)

Review Comment:
   why snpashotTask itself is not started?



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -269,36 +270,145 @@ public void start() {
         .values()
         .forEach(ReconOmTask::init);
 
-    // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber number not 
matching with
+    // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber is greater than 
zero and greater than
+    // 'OmSnapshotRequest' task's lastUpdatedSeqNumber number and not matching 
with
     // lastUpdatedSeqNumber number for any of the OM task, then just run 
reprocess for such tasks.
+
+    ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater =
+        
taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmSnapshotRequest.name());
     ReconTaskStatusUpdater deltaTaskStatusUpdater =
         
taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmDeltaRequest.name());
 
-    Map<String, ReconOmTask> reconOmTaskMap = 
reconTaskController.getRegisteredTasks()
-        .entrySet()
-        .stream()
-        .filter(entry -> {
-          String taskName = entry.getKey();
-          ReconTaskStatusUpdater taskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
-
-          return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) && 
 // Condition 1
-              !taskStatusUpdater.getLastUpdatedSeqNumber()
-                  .equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber());  
// Condition 2
-        })
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));  
// Collect into desired Map
+    Map<String, ReconOmTask> reconOmTaskMap =
+        verifyAndGetReconOmTaskMapForReprocess(fullSnapshotTaskStatusUpdater, 
deltaTaskStatusUpdater);
+    reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap);
+    startSyncDataFromOM(initialDelay);
+  }
+
+  /**
+   * This method validates the following cases for OM tasks to be reprocessed 
during Recon bootstrap process.
+   *
+   * Case 1: Normal bootstrap flow will take care of this scenario.
+   * full snapshot: DB not Updated
+   *   - Om Snapshot number - 0
+   *   - Om Delta snapshot number - 0
+   *   - All Om Tasks snapshot number - 0
+   *
+   * Case 2: This case will force Recon to run reprocess of only those OM 
tasks whose
+   *         last updated sequence number is zero.
+   * full snapshot: DB Updated, Tasks not reprocessed, Recon restarted or crash
+   *   - Om Snapshot number - 100000
+   *   - Om Delta snapshot number - 0
+   *   - Few Om Tasks snapshot number - 0, remaining Om tasks snapshot number 
- 100000
+   *
+   * Case 3: This case will force Recon to run reprocess of all OM tasks.
+   * full snapshot: DB Updated, Tasks not reprocessed, Recon restarted or crash
+   *   - Om Snapshot number - 100000
+   *   - Om Delta snapshot number - 0
+   *   - All Om Tasks snapshot number - 0
+   *
+   * Case 4: This case will not force to reprocess any OM tasks and on restart 
of Recon,
+   *         bootstrap normal flow will be okay.
+   * full snapshot: DB Updated, Tasks reprocessed, but before delta DB 
applied, Recon restarted or crash
+   *   - Om Snapshot number - 100000
+   *   - Om Delta snapshot number - 0
+   *   - All Om Tasks snapshot number - 100000
+   *
+   * Case 5: This case will force Recon to run reprocess of all OM tasks.
+   * full snapshot: DB Updated, Tasks reprocessed, delta DB updates also 
applied, recon restarted or crash,
+   *                but all delta tasks not processed.
+   *   - Om Snapshot number - 100000
+   *   - Om Delta snapshot number - 100010
+   *   - All Om Tasks snapshot number - 100000
+   *
+   * Case 6: This case will force Recon to run reprocess of only those OM 
tasks whose

Review Comment:
   how case 5 and case 6 are different? is it not simple that if fullSnapshot 
task not same delta or task snapshot number, trigger those task.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -269,36 +270,145 @@ public void start() {
         .values()
         .forEach(ReconOmTask::init);
 
-    // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber number not 
matching with
+    // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber is greater than 
zero and greater than
+    // 'OmSnapshotRequest' task's lastUpdatedSeqNumber number and not matching 
with
     // lastUpdatedSeqNumber number for any of the OM task, then just run 
reprocess for such tasks.
+
+    ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater =
+        
taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmSnapshotRequest.name());
     ReconTaskStatusUpdater deltaTaskStatusUpdater =
         
taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmDeltaRequest.name());
 
-    Map<String, ReconOmTask> reconOmTaskMap = 
reconTaskController.getRegisteredTasks()
-        .entrySet()
-        .stream()
-        .filter(entry -> {
-          String taskName = entry.getKey();
-          ReconTaskStatusUpdater taskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
-
-          return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) && 
 // Condition 1
-              !taskStatusUpdater.getLastUpdatedSeqNumber()
-                  .equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber());  
// Condition 2
-        })
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));  
// Collect into desired Map
+    Map<String, ReconOmTask> reconOmTaskMap =
+        verifyAndGetReconOmTaskMapForReprocess(fullSnapshotTaskStatusUpdater, 
deltaTaskStatusUpdater);
+    reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap);
+    startSyncDataFromOM(initialDelay);
+  }
+
+  /**
+   * This method validates the following cases for OM tasks to be reprocessed 
during Recon bootstrap process.
+   *
+   * Case 1: Normal bootstrap flow will take care of this scenario.
+   * full snapshot: DB not Updated
+   *   - Om Snapshot number - 0
+   *   - Om Delta snapshot number - 0
+   *   - All Om Tasks snapshot number - 0
+   *
+   * Case 2: This case will force Recon to run reprocess of only those OM 
tasks whose
+   *         last updated sequence number is zero.
+   * full snapshot: DB Updated, Tasks not reprocessed, Recon restarted or crash
+   *   - Om Snapshot number - 100000
+   *   - Om Delta snapshot number - 0
+   *   - Few Om Tasks snapshot number - 0, remaining Om tasks snapshot number 
- 100000
+   *
+   * Case 3: This case will force Recon to run reprocess of all OM tasks.
+   * full snapshot: DB Updated, Tasks not reprocessed, Recon restarted or crash
+   *   - Om Snapshot number - 100000
+   *   - Om Delta snapshot number - 0
+   *   - All Om Tasks snapshot number - 0
+   *
+   * Case 4: This case will not force to reprocess any OM tasks and on restart 
of Recon,
+   *         bootstrap normal flow will be okay.
+   * full snapshot: DB Updated, Tasks reprocessed, but before delta DB 
applied, Recon restarted or crash
+   *   - Om Snapshot number - 100000
+   *   - Om Delta snapshot number - 0
+   *   - All Om Tasks snapshot number - 100000
+   *
+   * Case 5: This case will force Recon to run reprocess of all OM tasks.
+   * full snapshot: DB Updated, Tasks reprocessed, delta DB updates also 
applied, recon restarted or crash,
+   *                but all delta tasks not processed.
+   *   - Om Snapshot number - 100000
+   *   - Om Delta snapshot number - 100010
+   *   - All Om Tasks snapshot number - 100000
+   *
+   * Case 6: This case will force Recon to run reprocess of only those OM 
tasks whose
+   *         last updated sequence number is less than Om Delta snapshot 
number.
+   * full snapshot: DB Updated, Tasks reprocessed, delta DB updates also 
applied, recon restarted or crash,
+   *                but delta tasks not processed.
+   *   - Om Snapshot number - 100000
+   *   - Om Delta snapshot number - 100010
+   *   - Few Om Tasks snapshot number - 100000  , Remaining Om Tasks snapshot 
number - 100010
+   *
+   * @param fullSnapshotTaskStatusUpdater
+   * @param deltaTaskStatusUpdater
+   * @return reconOmTaskMap
+   */
+  private Map<String, ReconOmTask> verifyAndGetReconOmTaskMapForReprocess(
+      ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater,
+      ReconTaskStatusUpdater deltaTaskStatusUpdater) {
+
+    Map<String, ReconOmTask> reconOmTaskMap = new HashMap<>();
+
+    for (Map.Entry<String, ReconOmTask> entry : 
reconTaskController.getRegisteredTasks().entrySet()) {
+      String taskName = entry.getKey();
+      ReconOmTask task = entry.getValue();
+      ReconTaskStatusUpdater taskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
+
+      if (taskStatusUpdater == null) {
+        continue; // Handle null cases
+      }
+
+      boolean isBootstrapTask =
+          shouldProcessTaskForBootstrap(fullSnapshotTaskStatusUpdater, 
deltaTaskStatusUpdater, taskName,
+              taskStatusUpdater);
+      boolean isDeltaTask =
+          shouldProcessTaskForDelta(fullSnapshotTaskStatusUpdater, 
deltaTaskStatusUpdater, taskName, taskStatusUpdater);
+
+      if (isBootstrapTask || isDeltaTask) {
+        if (reconOmTaskMap.containsKey(taskName)) {
+          LOG.warn("Duplicate task detected in reconOmTaskMap: {}", taskName);
+        }
+        reconOmTaskMap.put(taskName, task);
+      }
+    }
+
     if (!reconOmTaskMap.isEmpty()) {
-      LOG.info("Task name and last updated sequence number of tasks, that are 
not matching with " +
-          "the last updated sequence number of OmDeltaRequest task:\n");
+      LOG.info("Tasks with mismatched last updated sequence numbers:");
       LOG.info("{} -> {}", deltaTaskStatusUpdater.getTaskName(), 
deltaTaskStatusUpdater.getLastUpdatedSeqNumber());
-      reconOmTaskMap.keySet()
-          .forEach(taskName -> {
-            LOG.info("{} -> {}", taskName,
-                
taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber());

Review Comment:
   how fullSnapshotTask, delta task and taskStatus are related?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to