devmadhuu commented on code in PR #7960:
URL: https://github.com/apache/ozone/pull/7960#discussion_r1981458723


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java:
##########
@@ -40,10 +40,17 @@ void consumeOMEvents(OMUpdateEventBatch events,
                        OMMetadataManager omMetadataManager);
 
   /**
-   * Pass on the handle to a new OM DB instance to the registered tasks.
-   * @param omMetadataManager OM Metadata Manager instance
+   * Reinitializes the registered Recon OM tasks with a new OM Metadata 
Manager instance.
+   *
+   * @param omMetadataManager the OM Metadata Manager instance to be used for 
reinitialization.
+   * @param reconOmTaskMap a map of Recon OM tasks whose lastUpdatedSeqNumber 
does not match
+   *                       the lastUpdatedSeqNumber from the previous run of 
the 'OmDeltaRequest' task.
+   *                       These tasks will be reinitialized to process the 
delta OM DB updates
+   *                       received in the last run of 'OmDeltaRequest'.
+   *                       If {@code reconOmTaskMap} is null, all registered 
Recon OM tasks
+   *                       will be reinitialized.

Review Comment:
   Ok sure.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -267,6 +268,36 @@ public void start() {
     reconTaskController.getRegisteredTasks()
         .values()
         .forEach(ReconOmTask::init);
+
+    // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber number not 
matching with
+    // lastUpdatedSeqNumber number for any of the OM task, then just run 
reprocess for such tasks.
+    ReconTaskStatusUpdater deltaTaskStatusUpdater =
+        
taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmDeltaRequest.name());
+
+    Map<String, ReconOmTask> reconOmTaskMap = 
reconTaskController.getRegisteredTasks()
+        .entrySet()
+        .stream()
+        .filter(entry -> {
+          String taskName = entry.getKey();
+          ReconTaskStatusUpdater taskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
+
+          return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) && 
 // Condition 1
+              !taskStatusUpdater.getLastUpdatedSeqNumber()
+                  .equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber());  
// Condition 2
+        })
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));  
// Collect into desired Map
+    if (!reconOmTaskMap.isEmpty()) {
+      LOG.info("Task details of such tasks whose lastUpdatedSeqNumber number 
not matching with " +
+          "lastUpdatedSeqNumber of 'OmDeltaRequest' task::\n");
+      LOG.info("{}->{}", deltaTaskStatusUpdater.getTaskName(), 
deltaTaskStatusUpdater.getLastUpdatedSeqNumber());
+      reconOmTaskMap.keySet()
+          .forEach(taskName -> {
+            LOG.info("{}->{}", taskName,
+                
taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber());
+
+          });

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to