devmadhuu commented on code in PR #8098:
URL: https://github.com/apache/ozone/pull/8098#discussion_r1998033538
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -269,36 +270,145 @@ public void start() {
.values()
.forEach(ReconOmTask::init);
- // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber number not
matching with
+ // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber is greater than
zero and greater than
+ // 'OmSnapshotRequest' task's lastUpdatedSeqNumber number and not matching
with
// lastUpdatedSeqNumber number for any of the OM task, then just run
reprocess for such tasks.
+
+ ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater =
+
taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmSnapshotRequest.name());
ReconTaskStatusUpdater deltaTaskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmDeltaRequest.name());
- Map<String, ReconOmTask> reconOmTaskMap =
reconTaskController.getRegisteredTasks()
- .entrySet()
- .stream()
- .filter(entry -> {
- String taskName = entry.getKey();
- ReconTaskStatusUpdater taskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
-
- return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) &&
// Condition 1
- !taskStatusUpdater.getLastUpdatedSeqNumber()
- .equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber());
// Condition 2
- })
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// Collect into desired Map
+ Map<String, ReconOmTask> reconOmTaskMap =
+ verifyAndGetReconOmTaskMapForReprocess(fullSnapshotTaskStatusUpdater,
deltaTaskStatusUpdater);
+ reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap);
+ startSyncDataFromOM(initialDelay);
+ }
+
+ /**
+ * This method validates the following cases for OM tasks to be reprocessed
during Recon bootstrap process.
+ *
+ * Case 1: Normal bootstrap flow will take care of this scenario.
+ * full snapshot: DB not Updated
+ * - Om Snapshot number - 0
+ * - Om Delta snapshot number - 0
+ * - All Om Tasks snapshot number - 0
+ *
+ * Case 2: This case will force Recon to run reprocess of only those OM
tasks whose
+ * last updated sequence number is zero.
+ * full snapshot: DB Updated, Tasks not reprocessed, Recon restarted or crash
+ * - Om Snapshot number - 100000
+ * - Om Delta snapshot number - 0
+ * - Few Om Tasks snapshot number - 0, remaining Om tasks snapshot number
- 100000
+ *
+ * Case 3: This case will force Recon to run reprocess of all OM tasks.
+ * full snapshot: DB Updated, Tasks not reprocessed, Recon restarted or crash
+ * - Om Snapshot number - 100000
+ * - Om Delta snapshot number - 0
+ * - All Om Tasks snapshot number - 0
+ *
+ * Case 4: This case will not force to reprocess any OM tasks and on restart
of Recon,
+ * bootstrap normal flow will be okay.
+ * full snapshot: DB Updated, Tasks reprocessed, but before delta DB
applied, Recon restarted or crash
+ * - Om Snapshot number - 100000
+ * - Om Delta snapshot number - 0
+ * - All Om Tasks snapshot number - 100000
+ *
+ * Case 5: This case will force Recon to run reprocess of all OM tasks.
+ * full snapshot: DB Updated, Tasks reprocessed, delta DB updates also
applied, recon restarted or crash,
+ * but all delta tasks not processed.
+ * - Om Snapshot number - 100000
+ * - Om Delta snapshot number - 100010
+ * - All Om Tasks snapshot number - 100000
+ *
+ * Case 6: This case will force Recon to run reprocess of only those OM
tasks whose
+ * last updated sequence number is less than Om Delta snapshot
number.
+ * full snapshot: DB Updated, Tasks reprocessed, delta DB updates also
applied, recon restarted or crash,
+ * but delta tasks not processed.
+ * - Om Snapshot number - 100000
+ * - Om Delta snapshot number - 100010
+ * - Few Om Tasks snapshot number - 100000 , Remaining Om Tasks snapshot
number - 100010
+ *
+ * @param fullSnapshotTaskStatusUpdater
+ * @param deltaTaskStatusUpdater
+ * @return reconOmTaskMap
+ */
+ private Map<String, ReconOmTask> verifyAndGetReconOmTaskMapForReprocess(
+ ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater,
+ ReconTaskStatusUpdater deltaTaskStatusUpdater) {
+
+ Map<String, ReconOmTask> reconOmTaskMap = new HashMap<>();
+
+ for (Map.Entry<String, ReconOmTask> entry :
reconTaskController.getRegisteredTasks().entrySet()) {
+ String taskName = entry.getKey();
+ ReconOmTask task = entry.getValue();
+ ReconTaskStatusUpdater taskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
+
+ if (taskStatusUpdater == null) {
+ continue; // Handle null cases
+ }
+
+ boolean isBootstrapTask =
+ shouldProcessTaskForBootstrap(fullSnapshotTaskStatusUpdater,
deltaTaskStatusUpdater, taskName,
+ taskStatusUpdater);
+ boolean isDeltaTask =
+ shouldProcessTaskForDelta(fullSnapshotTaskStatusUpdater,
deltaTaskStatusUpdater, taskName, taskStatusUpdater);
+
+ if (isBootstrapTask || isDeltaTask) {
+ if (reconOmTaskMap.containsKey(taskName)) {
+ LOG.warn("Duplicate task detected in reconOmTaskMap: {}", taskName);
+ }
+ reconOmTaskMap.put(taskName, task);
+ }
+ }
+
if (!reconOmTaskMap.isEmpty()) {
- LOG.info("Task name and last updated sequence number of tasks, that are
not matching with " +
- "the last updated sequence number of OmDeltaRequest task:\n");
+ LOG.info("Tasks with mismatched last updated sequence numbers:");
LOG.info("{} -> {}", deltaTaskStatusUpdater.getTaskName(),
deltaTaskStatusUpdater.getLastUpdatedSeqNumber());
- reconOmTaskMap.keySet()
- .forEach(taskName -> {
- LOG.info("{} -> {}", taskName,
-
taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber());
- });
+ reconOmTaskMap.forEach((taskName, task) -> {
+ long lastUpdatedSeqNum =
taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber();
+ LOG.info("{} -> {}", taskName, lastUpdatedSeqNum);
+ });
}
- reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap);
- startSyncDataFromOM(initialDelay);
+
+ return reconOmTaskMap;
+ }
+
+ /**
+ * Determines if a task should be processed under the "bootstrap" condition.
+ */
+ private boolean shouldProcessTaskForBootstrap(
+ ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater,
+ ReconTaskStatusUpdater deltaTaskStatusUpdater,
+ String taskName,
+ ReconTaskStatusUpdater taskStatusUpdater) {
+ return fullSnapshotTaskStatusUpdater.getLastUpdatedSeqNumber() > 0
+ && deltaTaskStatusUpdater.getLastUpdatedSeqNumber() == 0
+ && !isOmSnapshotTask(taskName)
Review Comment:
This logic is for re-run over reprocess of OM tasks (which means process the
DB data) and not getting DB updates again.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -269,36 +270,145 @@ public void start() {
.values()
.forEach(ReconOmTask::init);
- // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber number not
matching with
+ // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber is greater than
zero and greater than
+ // 'OmSnapshotRequest' task's lastUpdatedSeqNumber number and not matching
with
// lastUpdatedSeqNumber number for any of the OM task, then just run
reprocess for such tasks.
+
+ ReconTaskStatusUpdater fullSnapshotTaskStatusUpdater =
+
taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmSnapshotRequest.name());
ReconTaskStatusUpdater deltaTaskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmDeltaRequest.name());
- Map<String, ReconOmTask> reconOmTaskMap =
reconTaskController.getRegisteredTasks()
- .entrySet()
- .stream()
- .filter(entry -> {
- String taskName = entry.getKey();
- ReconTaskStatusUpdater taskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
-
- return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) &&
// Condition 1
- !taskStatusUpdater.getLastUpdatedSeqNumber()
- .equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber());
// Condition 2
- })
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// Collect into desired Map
+ Map<String, ReconOmTask> reconOmTaskMap =
+ verifyAndGetReconOmTaskMapForReprocess(fullSnapshotTaskStatusUpdater,
deltaTaskStatusUpdater);
+ reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap);
+ startSyncDataFromOM(initialDelay);
+ }
+
+ /**
+ * This method validates the following cases for OM tasks to be reprocessed
during Recon bootstrap process.
+ *
+ * Case 1: Normal bootstrap flow will take care of this scenario.
+ * full snapshot: DB not Updated
+ * - Om Snapshot number - 0
+ * - Om Delta snapshot number - 0
+ * - All Om Tasks snapshot number - 0
+ *
+ * Case 2: This case will force Recon to run reprocess of only those OM
tasks whose
+ * last updated sequence number is zero.
+ * full snapshot: DB Updated, Tasks not reprocessed, Recon restarted or crash
+ * - Om Snapshot number - 100000
+ * - Om Delta snapshot number - 0
+ * - Few Om Tasks snapshot number - 0, remaining Om tasks snapshot number
- 100000
+ *
+ * Case 3: This case will force Recon to run reprocess of all OM tasks.
+ * full snapshot: DB Updated, Tasks not reprocessed, Recon restarted or crash
+ * - Om Snapshot number - 100000
+ * - Om Delta snapshot number - 0
+ * - All Om Tasks snapshot number - 0
+ *
+ * Case 4: This case will not force to reprocess any OM tasks and on restart
of Recon,
+ * bootstrap normal flow will be okay.
+ * full snapshot: DB Updated, Tasks reprocessed, but before delta DB
applied, Recon restarted or crash
+ * - Om Snapshot number - 100000
+ * - Om Delta snapshot number - 0
+ * - All Om Tasks snapshot number - 100000
+ *
+ * Case 5: This case will force Recon to run reprocess of all OM tasks.
+ * full snapshot: DB Updated, Tasks reprocessed, delta DB updates also
applied, recon restarted or crash,
+ * but all delta tasks not processed.
+ * - Om Snapshot number - 100000
+ * - Om Delta snapshot number - 100010
+ * - All Om Tasks snapshot number - 100000
+ *
+ * Case 6: This case will force Recon to run reprocess of only those OM
tasks whose
Review Comment:
Difference between case 5 and case 6 is that case 5 has All the OM tasks has
last updated sequence number behind the delta task's last updated sequence
number. Case 6 has only few tasks where their last updated sequence number
behind the delta task's last updated sequence number. So in case 5, all OM
tasks will go for reprocess and in case 6, only those OM tasks will go for
reprocess who couldn't complete their process delta updates and before that
Recon crashed or restarted.
On your question - "`if fullSnapshot task not same delta or task snapshot
number, trigger those task.`"
--- I think, the condition alone which you mentioned, will not cover case
4, pls check and in case 4 with condition you mentioned, all OM tasks will go
for reprocess which we don't want.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]