devmadhuu commented on code in PR #7960:
URL: https://github.com/apache/ozone/pull/7960#discussion_r1981458723
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java:
##########
@@ -40,10 +40,17 @@ void consumeOMEvents(OMUpdateEventBatch events,
OMMetadataManager omMetadataManager);
/**
- * Pass on the handle to a new OM DB instance to the registered tasks.
- * @param omMetadataManager OM Metadata Manager instance
+ * Reinitializes the registered Recon OM tasks with a new OM Metadata
Manager instance.
+ *
+ * @param omMetadataManager the OM Metadata Manager instance to be used for
reinitialization.
+ * @param reconOmTaskMap a map of Recon OM tasks whose lastUpdatedSeqNumber
does not match
+ * the lastUpdatedSeqNumber from the previous run of
the 'OmDeltaRequest' task.
+ * These tasks will be reinitialized to process the
delta OM DB updates
+ * received in the last run of 'OmDeltaRequest'.
+ * If {@code reconOmTaskMap} is null, all registered
Recon OM tasks
+ * will be reinitialized.
Review Comment:
Ok sure.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -267,6 +268,36 @@ public void start() {
reconTaskController.getRegisteredTasks()
.values()
.forEach(ReconOmTask::init);
+
+ // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber number not
matching with
+ // lastUpdatedSeqNumber number for any of the OM task, then just run
reprocess for such tasks.
+ ReconTaskStatusUpdater deltaTaskStatusUpdater =
+
taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmDeltaRequest.name());
+
+ Map<String, ReconOmTask> reconOmTaskMap =
reconTaskController.getRegisteredTasks()
+ .entrySet()
+ .stream()
+ .filter(entry -> {
+ String taskName = entry.getKey();
+ ReconTaskStatusUpdater taskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
+
+ return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) &&
// Condition 1
+ !taskStatusUpdater.getLastUpdatedSeqNumber()
+ .equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber());
// Condition 2
+ })
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// Collect into desired Map
+ if (!reconOmTaskMap.isEmpty()) {
+ LOG.info("Task details of such tasks whose lastUpdatedSeqNumber number
not matching with " +
+ "lastUpdatedSeqNumber of 'OmDeltaRequest' task::\n");
+ LOG.info("{}->{}", deltaTaskStatusUpdater.getTaskName(),
deltaTaskStatusUpdater.getLastUpdatedSeqNumber());
+ reconOmTaskMap.keySet()
+ .forEach(taskName -> {
+ LOG.info("{}->{}", taskName,
+
taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber());
+
+ });
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]