devmadhuu commented on code in PR #8098:
URL: https://github.com/apache/ozone/pull/8098#discussion_r1998034887
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -269,36 +270,145 @@ public void start() {
.values()
.forEach(ReconOmTask::init);
- // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber number not
matching with
+ // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber is greater than
zero and greater than
+ // 'OmSnapshotRequest' task's lastUpdatedSeqNumber number and not matching
with
// lastUpdatedSeqNumber number for any of the OM task, then just run
reprocess for such tasks.
+
+ ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater =
+
taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmSnapshotRequest.name());
ReconTaskStatusUpdater deltaTaskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmDeltaRequest.name());
- Map<String, ReconOmTask> reconOmTaskMap =
reconTaskController.getRegisteredTasks()
- .entrySet()
- .stream()
- .filter(entry -> {
- String taskName = entry.getKey();
- ReconTaskStatusUpdater taskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
-
- return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) &&
// Condition 1
- !taskStatusUpdater.getLastUpdatedSeqNumber()
- .equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber());
// Condition 2
- })
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// Collect into desired Map
+ Map<String, ReconOmTask> reconOmTaskMap =
+ verifyAndGetReconOmTaskMapForReprocess(fullSnapshotTaskStatusUpdater,
deltaTaskStatusUpdater);
+ reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap);
+ startSyncDataFromOM(initialDelay);
+ }
+
+ /**
+ * This method validates the following cases for OM tasks to be reprocessed
during Recon bootstrap process.
+ *
+ * Case 1: Normal bootstrap flow will take care of this scenario.
+ * full snapshot: DB not Updated
+ * - Om Snapshot number - 0
+ * - Om Delta snapshot number - 0
+ * - All Om Tasks snapshot number - 0
+ *
+ * Case 2: This case will force Recon to run reprocess of only those OM
tasks whose
+ * last updated sequence number is zero.
+ * full snapshot: DB Updated, Tasks not reprocessed, Recon restarted or crash
+ * - Om Snapshot number - 100000
+ * - Om Delta snapshot number - 0
+ * - Few Om Tasks snapshot number - 0, remaining Om tasks snapshot number
- 100000
+ *
+ * Case 3: This case will force Recon to run reprocess of all OM tasks.
+ * full snapshot: DB Updated, Tasks not reprocessed, Recon restarted or crash
+ * - Om Snapshot number - 100000
+ * - Om Delta snapshot number - 0
+ * - All Om Tasks snapshot number - 0
+ *
+ * Case 4: This case will not force to reprocess any OM tasks and on restart
of Recon,
+ * bootstrap normal flow will be okay.
+ * full snapshot: DB Updated, Tasks reprocessed, but before delta DB
applied, Recon restarted or crash
+ * - Om Snapshot number - 100000
+ * - Om Delta snapshot number - 0
+ * - All Om Tasks snapshot number - 100000
+ *
+ * Case 5: This case will force Recon to run reprocess of all OM tasks.
+ * full snapshot: DB Updated, Tasks reprocessed, delta DB updates also
applied, recon restarted or crash,
+ * but all delta tasks not processed.
+ * - Om Snapshot number - 100000
+ * - Om Delta snapshot number - 100010
+ * - All Om Tasks snapshot number - 100000
+ *
+ * Case 6: This case will force Recon to run reprocess of only those OM
tasks whose
+ * last updated sequence number is less than Om Delta snapshot
number.
+ * full snapshot: DB Updated, Tasks reprocessed, delta DB updates also
applied, recon restarted or crash,
+ * but delta tasks not processed.
+ * - Om Snapshot number - 100000
+ * - Om Delta snapshot number - 100010
+ * - Few Om Tasks snapshot number - 100000 , Remaining Om Tasks snapshot
number - 100010
+ *
+ * @param fullSnapshotTaskStatusUpdater
+ * @param deltaTaskStatusUpdater
+ * @return reconOmTaskMap
+ */
+ private Map<String, ReconOmTask> verifyAndGetReconOmTaskMapForReprocess(
+ ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater,
+ ReconTaskStatusUpdater deltaTaskStatusUpdater) {
+
+ Map<String, ReconOmTask> reconOmTaskMap = new HashMap<>();
+
+ for (Map.Entry<String, ReconOmTask> entry :
reconTaskController.getRegisteredTasks().entrySet()) {
+ String taskName = entry.getKey();
+ ReconOmTask task = entry.getValue();
+ ReconTaskStatusUpdater taskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
+
+ if (taskStatusUpdater == null) {
+ continue; // Handle null cases
+ }
+
+ boolean isBootstrapTask =
+ shouldProcessTaskForBootstrap(fullSnapshotTaskStatusUpdater,
deltaTaskStatusUpdater, taskName,
+ taskStatusUpdater);
+ boolean isDeltaTask =
+ shouldProcessTaskForDelta(fullSnapshotTaskStatusUpdater,
deltaTaskStatusUpdater, taskName, taskStatusUpdater);
+
+ if (isBootstrapTask || isDeltaTask) {
+ if (reconOmTaskMap.containsKey(taskName)) {
+ LOG.warn("Duplicate task detected in reconOmTaskMap: {}", taskName);
+ }
+ reconOmTaskMap.put(taskName, task);
+ }
+ }
+
if (!reconOmTaskMap.isEmpty()) {
- LOG.info("Task name and last updated sequence number of tasks, that are
not matching with " +
- "the last updated sequence number of OmDeltaRequest task:\n");
+ LOG.info("Tasks with mismatched last updated sequence numbers:");
LOG.info("{} -> {}", deltaTaskStatusUpdater.getTaskName(),
deltaTaskStatusUpdater.getLastUpdatedSeqNumber());
- reconOmTaskMap.keySet()
- .forEach(taskName -> {
- LOG.info("{} -> {}", taskName,
-
taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber());
Review Comment:
We don't want to get the DB updates again, just want OM tasks to reprocess
if they failed in their last run whether it was bootstrap case or delta updates
case and Recon was need to be restarted or crashed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]