ArafatKhan2198 commented on code in PR #9243:
URL: https://github.com/apache/ozone/pull/9243#discussion_r2508929049
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -702,145 +702,153 @@ public boolean syncDataFromOM() {
try {
long currentSequenceNumber = getCurrentOMDBSequenceNumber();
LOG.info("Seq number of Recon's OM DB : {}", currentSequenceNumber);
- boolean fullSnapshot = false;
+ boolean fullSnapshot = true;
+
- if (currentSequenceNumber <= 0) {
- fullSnapshot = true;
+ if (reInitializeTasksCalled.compareAndSet(false, true)) {
+ LOG.info("Calling reprocess on Recon tasks.");
+ reconTaskController.reInitializeTasks(omMetadataManager, null);
} else {
- // Get updates from OM and apply to local Recon OM DB and update
task status in table
- deltaReconTaskStatusUpdater.recordRunStart();
- int loopCount = 0;
- long fromSequenceNumber = currentSequenceNumber;
- long diffBetweenOMDbAndReconDBSeqNumber = deltaUpdateLimit + 1;
- /**
- * This loop will continue to fetch and apply OM DB updates and with
every
- * OM DB fetch request, it will fetch {@code deltaUpdateLimit} count
of DB updates.
- * It continues to fetch from OM till the lag, between OM DB WAL
sequence number
- * and Recon OM DB snapshot WAL sequence number, is less than this
lag threshold value.
- * In high OM write TPS cluster, this simulates continuous pull from
OM without any delay.
- */
- while (diffBetweenOMDbAndReconDBSeqNumber > omDBLagThreshold) {
- try (OMDBUpdatesHandler omdbUpdatesHandler =
- new OMDBUpdatesHandler(omMetadataManager)) {
-
- // If interrupt was previously signalled,
- // we should check for it before starting delta update sync.
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException("Thread interrupted during
delta update.");
- }
- diffBetweenOMDbAndReconDBSeqNumber =
- getAndApplyDeltaUpdatesFromOM(currentSequenceNumber,
omdbUpdatesHandler);
- deltaReconTaskStatusUpdater.setLastTaskRunStatus(0);
- // Keeping last updated sequence number for both full and delta
tasks to be same
- // because sequence number of DB denotes and points to same OM
DB copy of Recon,
- // even though two different tasks are updating the DB at
different conditions, but
- // it tells the sync state with actual OM DB for the same Recon
OM DB copy.
-
deltaReconTaskStatusUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
-
fullSnapshotReconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
- deltaReconTaskStatusUpdater.recordRunCompletion();
- fullSnapshotReconTaskUpdater.updateDetails();
- // Update the current OM metadata manager in task controller
- reconTaskController.updateOMMetadataManager(omMetadataManager);
-
- // Pass on DB update events to tasks that are listening.
- reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
- omdbUpdatesHandler.getEvents(),
omdbUpdatesHandler.getLatestSequenceNumber()), omMetadataManager);
-
- // Check if task reinitialization is needed due to buffer
overflow or task failures
- boolean bufferOverflowed =
reconTaskController.hasEventBufferOverflowed();
- boolean tasksFailed = reconTaskController.hasTasksFailed();
-
- if (bufferOverflowed || tasksFailed) {
- ReconTaskReInitializationEvent.ReInitializationReason reason =
bufferOverflowed ?
-
ReconTaskReInitializationEvent.ReInitializationReason.BUFFER_OVERFLOW :
-
ReconTaskReInitializationEvent.ReInitializationReason.TASK_FAILURES;
-
- LOG.warn("Detected condition for task reinitialization: {},
queueing async reinitialization event",
- reason);
-
- markDeltaTaskStatusAsFailed(deltaReconTaskStatusUpdater);
-
- // Queue async reinitialization event - checkpoint creation
and retry logic is handled internally
- ReconTaskController.ReInitializationResult result =
- reconTaskController.queueReInitializationEvent(reason);
-
- //TODO: Create a metric to track this event buffer overflow or
task failure event
- boolean triggerFullSnapshot =
- Optional.ofNullable(result)
- .map(r -> {
- switch (r) {
- case MAX_RETRIES_EXCEEDED:
- LOG.warn(
- "Reinitialization queue failures exceeded
maximum retries, triggering full snapshot " +
- "fallback");
- return true;
-
- case RETRY_LATER:
- LOG.debug("Reinitialization event queueing will be
retried in next iteration");
- return false;
-
- default:
- LOG.info("Reinitialization event successfully
queued");
- return false;
- }
- })
- .orElseGet(() -> {
- LOG.error(
- "ReInitializationResult is null, something went
wrong in queueing reinitialization " +
- "event");
- return true;
- });
-
- if (triggerFullSnapshot) {
- fullSnapshot = true;
- }
- }
- currentSequenceNumber = getCurrentOMDBSequenceNumber();
- LOG.debug("Updated current sequence number: {}",
currentSequenceNumber);
- loopCount++;
- } catch (InterruptedException intEx) {
- LOG.error("OM DB Delta update sync thread was interrupted and
delta sync failed.");
- // We are updating the table even if it didn't run i.e. got
interrupted beforehand
- // to indicate that a task was supposed to run, but it didn't.
- markDeltaTaskStatusAsFailed(deltaReconTaskStatusUpdater);
- Thread.currentThread().interrupt();
- // Since thread is interrupted, we do not fall back to snapshot
sync.
- // Return with sync failed status.
- return false;
- } catch (Exception e) {
- markDeltaTaskStatusAsFailed(deltaReconTaskStatusUpdater);
- LOG.warn("Unable to get and apply delta updates from OM: {},
falling back to full snapshot",
- e.getMessage());
- fullSnapshot = true;
- }
- if (fullSnapshot) {
- break;
- }
- }
- LOG.info("Delta updates received from OM : {} loops, {} records",
loopCount,
- getCurrentOMDBSequenceNumber() - fromSequenceNumber);
+ LOG.info("reInitializeTasks already called once; skipping.");
}
- if (fullSnapshot) {
- try {
- executeFullSnapshot(fullSnapshotReconTaskUpdater,
deltaReconTaskStatusUpdater);
- } catch (InterruptedException intEx) {
- LOG.error("OM DB Snapshot update sync thread was interrupted.");
- fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
- fullSnapshotReconTaskUpdater.recordRunCompletion();
- Thread.currentThread().interrupt();
- // Mark sync status as failed.
- return false;
- } catch (Exception e) {
- metrics.incrNumSnapshotRequestsFailed();
- fullSnapshotReconTaskUpdater.setLastTaskRunStatus(-1);
- fullSnapshotReconTaskUpdater.recordRunCompletion();
- LOG.error("Unable to update Recon's metadata with new OM DB. ", e);
- // Update health status in ReconContext
- reconContext.updateHealthStatus(new AtomicBoolean(false));
-
reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
- }
- }
+// if (currentSequenceNumber <= 0) {
Review Comment:
Apologies, I made those changes for testing purposes.
Will remove them
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]