sumitagrawl commented on code in PR #10074:
URL: https://github.com/apache/ozone/pull/10074#discussion_r3215141643


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java:
##########
@@ -432,34 +437,72 @@ public void start() {
     } else {
       initializePipelinesFromScm();
     }
-    LOG.debug("Started the SCM Container Info sync scheduler.");
-    long interval = ozoneConfiguration.getTimeDuration(
-        OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DELAY,
-        OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
-    long initialDelay = ozoneConfiguration.getTimeDuration(
-        OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY,
-        OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
+    // -----------------------------------------------------------------------
+    // Scheduler (incremental/targeted sync): runs every 1h (default).
+    //
+    // Each cycle calls decideSyncAction() — two lightweight count RPCs to SCM
+    // — and then:
+    //
+    //   non-OPEN drift > threshold (default 1,000,000)
+    //       → warn and expose the drift via metrics; full snapshot is not
+    //         downloaded automatically by this periodic task
+    //
+    //   0 < |total drift| <= threshold
+    //       → targeted sync: 4-pass incremental repair
+    //
+    //   total drift = 0 but per-state drift (OPEN, QUASI_CLOSED, or CLOSED)
+    //       >= threshold (default 1)
+    //       → targeted sync: corrects containers stuck in a stale lifecycle 
state
+    //
+    //   no drift detected
+    //       → no action this cycle
+    //
+    // Running this on a 1h cadence means container state discrepancies are
+    // detected and corrected without an unconditional periodic full snapshot.
+    // -----------------------------------------------------------------------
+    long syncInterval = ozoneConfiguration.getTimeDuration(
+        OZONE_RECON_SCM_CONTAINER_SYNC_TASK_INTERVAL_DELAY,
+        OZONE_RECON_SCM_CONTAINER_SYNC_TASK_INTERVAL_DEFAULT, 
TimeUnit.MILLISECONDS);
+    long syncInitialDelay = ozoneConfiguration.getTimeDuration(
+        OZONE_RECON_SCM_CONTAINER_SYNC_TASK_INITIAL_DELAY,
+        OZONE_RECON_SCM_CONTAINER_SYNC_TASK_INITIAL_DELAY_DEFAULT,
         TimeUnit.MILLISECONDS);
-    // This periodic sync with SCM container cache is needed because during
-    // the window when recon will be down and any container being added
-    // newly and went missing, that container will not be reported as missing 
by
-    // recon till there is a difference of container count equivalent to
-    // threshold value defined in "ozone.recon.scm.container.threshold"
-    // between SCM container cache and recon container cache.
+    LOG.debug("Started the SCM Container Info sync scheduler (interval={}ms, 
initialDelay={}ms).",
+        syncInterval, syncInitialDelay);
     scheduler.scheduleWithFixedDelay(() -> {
+      if (!isSyncDataFromSCMRunning.compareAndSet(false, true)) {
+        LOG.debug("SCM container info sync is already running; skipping this 
cycle.");
+        return;
+      }
       try {
-        boolean isSuccess = syncWithSCMContainerInfo();
-        if (!isSuccess) {
-          LOG.debug("SCM container info sync is already running.");
+        ReconStorageContainerSyncHelper.SyncAction action =
+            containerSyncHelper.decideSyncAction();
+        switch (action) {
+        case LARGE_DRIFT_THRESHOLD_EXCEEDED:
+          break;

Review Comment:
   do db_sync is part of this ? If no action, still can have targetedSync here



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java:
##########
@@ -19,87 +19,1055 @@
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLEANUP;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLOSE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.DELETE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.FORCE_CLOSE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.QUASI_CLOSE;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD_DEFAULT;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.recon.metrics.ReconScmContainerSyncMetrics;
 import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Helper class that performs targeted incremental sync between SCM and Recon
+ * container metadata. Executes four passes per sync cycle, all completing in
+ * a single cycle with local pagination — no cross-cycle cursor state except
+ * for Pass 2 (see below):
+ *
+ * <ol>
+ *   <li><b>Pass 1 — CLOSED (SCM-driven, add + correct):</b> paginates SCM's
+ *       CLOSED container ID list; for each page, one batch RPC adds all absent
+ *       containers, and local state-machine transitions correct containers 
stuck
+ *       in OPEN, CLOSING, or QUASI_CLOSED in Recon but already CLOSED in 
SCM.</li>
+ *   <li><b>Pass 2 — OPEN (SCM-driven, add only, incremental cursor):</b> scans
+ *       only newly created OPEN containers starting from the last-seen ID
+ *       ({@code pass2OpenStartContainerId}). OPEN container IDs are monotonic 
so
+ *       rescanning from 1 every cycle would be wasteful; the cursor advances
+ *       forward only and never needs to go backward.</li>
+ *   <li><b>Pass 3 — QUASI_CLOSED (SCM-driven, add + correct):</b> paginates 
SCM's
+ *       QUASI_CLOSED list; adds absent containers (fast path via 
pipeline-capable
+ *       batch RPC; fallback via {@code addContainerInfoFallback} for 
containers
+ *       with zero viable replicas whose pipeline cannot be resolved) and 
corrects
+ *       containers stuck OPEN or CLOSING in Recon.</li>
+   *   <li><b>Pass 4 — DELETED retirement (SCM-driven, transition only):</b>
+   *       paginates SCM's DELETED list using {@code getListOfContainerIDs} 
(IDs
+   *       only, 12 bytes each, no pipeline resolution). For each ID SCM 
reports
+   *       as DELETED, Recon drives the container atomically from
+   *       CLOSED/QUASI_CLOSED → DELETING → DELETED in a single call. The
+   *       DELETING list is intentionally skipped to avoid leaving Recon in an
+   *       intermediate DELETING state across cycles.</li>
+ * </ol>
+ *
+ * <h3>Scalability at 100M containers</h3>
+ * <ul>
+ *   <li>{@link #decideSyncAction()} uses {@link
+ *       
org.apache.hadoop.hdds.scm.container.ContainerManager#getTotalContainerCount()}
+ *       (O(number of states), all O(1) per-state lookups) — never calls
+ *       {@code getContainers()} which would allocate a 20-40 GB
+ *       {@code List<ContainerInfo>} on every 6h decision tick.</li>
+ *   <li>Pass 1 and Pass 3 issue <b>one</b> {@code 
getExistContainerWithPipelinesInBatch}
+ *       RPC per sub-batch of absent containers — not one per absent container.
+ *       Sub-batch size is bounded by {@link 
#safeContainerWithPipelineBatchSize}
+ *       to keep the CWP response within the 128 MB IPC limit.</li>
+   *   <li>Pass 4 uses {@code getListOfContainerIDs} (IDs only, 12 
bytes/container)
+   *       against SCM's DELETED list only. Even 1 billion deleted containers
+   *       require only ~100 page calls per cycle at the 128 MB IPC limit.</li>
+ * </ul>
+ */
 class ReconStorageContainerSyncHelper {
 
-  // Serialized size of one ContainerID proto on the wire (varint tag + 8-byte 
long = ~12 bytes).
-  // Used to derive the maximum batch size that fits within 
ipc.maximum.data.length.
+  /**
+   * Wire size of one {@code ContainerID} proto (varint tag + 8-byte long ≈ 12 
bytes).
+   * Used to compute the maximum number of IDs that fit in one
+   * {@code getListOfContainerIDs} RPC call, where both the request (IDs sent
+   * to SCM) and the response (IDs returned by SCM) carry only ContainerID 
entries.
+   * Applies to Pass 1, Pass 2, Pass 3 pagination and Pass 4
+   * (DELETED ID list).
+   */
   private static final long CONTAINER_ID_PROTO_SIZE_BYTES = 12;
 
+  /**
+   * Conservative wire-size upper bound for one {@code ContainerInfo} proto
+   * response entry (used by {@code getListOfContainerInfos}).
+   *
+   * <p>ContainerInfo carries only container metadata — no pipeline, no
+   * DatanodeDetails. Measured estimate: containerID(8) + state(2) +
+   * usedBytes(8) + numberOfKeys(8) + owner(~20) + replicationType(2) +
+   * replicationFactor(2) + stateEnterTime(8) + sequenceId(8) +
+   * pipelineID(~20) ≈ 86 bytes. This constant uses <b>128 bytes</b>
+   * (~1.5× measured) as a safety margin.
+   *
+   * <p>Safe max batch for {@code getListOfContainerInfos} at 128 MB IPC limit:
+   * <pre>
+   *   128 MB / 128 bytes = 1,048,576 containers per call
+   *   (actual bytes: 1,048,576 × 86 ≈ 86 MB — well within limit)
+   * </pre>
+   *
+   * <p>This is ~8× larger than {@link 
#CONTAINER_WITH_PIPELINE_PROTO_SIZE_BYTES}
+   * because ContainerInfo does not include the Pipeline and DatanodeDetails
+   * entries that dominate the ContainerWithPipeline size.
+   */
+  private static final long CONTAINER_INFO_PROTO_SIZE_BYTES = 128;
+
+  /**
+   * Conservative wire-size upper bound for one {@code ContainerWithPipeline}
+   * proto response entry.
+   *
+   * <p>Measured estimate: ContainerInfoProto ~120 bytes + PipelineProto with 3
+   * DatanodeDetailsProto entries ~370 bytes ≈ 490 bytes. This constant uses
+   * <b>1024 bytes</b> — approximately 2× the measured value — to provide a
+   * comfortable safety margin against larger deployments where hostnames,
+   * certificates, or additional port entries grow the proto beyond the 
estimate.
+   *
+   * <p>This constant is used exclusively to bound the <em>response</em> of
+   * {@code getExistContainerWithPipelinesInBatch}. The <em>request</em> 
carries
+   * only container IDs and is bounded by {@link 
#CONTAINER_ID_PROTO_SIZE_BYTES}.
+   * The two constants are different because the request and response payloads
+   * have vastly different sizes (12 bytes vs ~490 bytes per entry).
+   *
+   * <h3>Safe batch limits at the 128 MB default IPC ceiling</h3>
+   * <p>{@code IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 134,217,728 bytes = 128 MB}
+   * (verified from Hadoop 3.x {@code CommonConfigurationKeys}).
+   * <pre>
+   *   Single-state CWP call (Pass 1/3 absent-container adds):
+   *     128 MB / 1024 bytes = 131,072 containers per call
+   *     (actual bytes: 131,072 × 490 ≈ 61 MB — well within limit)
+   * </pre>
+   *
+   * @see #safeContainerWithPipelineBatchSize(int)
+   */
+  private static final long CONTAINER_WITH_PIPELINE_PROTO_SIZE_BYTES = 1024;
+
+  /**
+   * Monotonic cursor for Pass 2 (OPEN add-only sync). OPEN containers are
+   * created with increasing container IDs, so each cycle only needs to scan
+   * from the last-seen ID onward rather than rescanning the full OPEN set.
+   *
+   * <p>{@link AtomicLong} rather than {@code volatile long}: provides the same
+   * visibility guarantee but expresses concurrent intent explicitly through 
the
+   * type, following standard Java concurrency conventions. The CAS mutex in
+   * {@link ReconStorageContainerManagerFacade} ensures a single writer, so
+   * compound-atomic operations ({@code compareAndSet}, {@code getAndAdd}) are
+   * not needed — only {@code get()} and {@code set()} are used.
+   */
+  private final AtomicLong pass2OpenStartContainerId = new AtomicLong(1L);
+
   private static final Logger LOG = LoggerFactory
       .getLogger(ReconStorageContainerSyncHelper.class);
 
   private final StorageContainerServiceProvider scmServiceProvider;
   private final OzoneConfiguration ozoneConfiguration;
   private final ReconContainerManager containerManager;
+  private final ReconScmContainerSyncMetrics metrics;
+
+  /**
+   * Describes the action that the periodic scheduler should take based on the
+   * observed drift between SCM and Recon container metadata.
+   */
+  public enum SyncAction {
+    /**
+     * No drift detected — no sync work needed this cycle.
+     */
+    NO_ACTION,
+
+    /**
+     * Small or per-state drift detected — run the four-pass targeted sync.
+     * This is the normal steady-state response: cheaper than a full snapshot
+     * and sufficient for the vast majority of drift scenarios.
+     */
+    TARGETED_SYNC,
+
+    /**
+     * Large non-OPEN drift exceeded the configured threshold. The periodic
+     * scheduler records this condition through logs and metrics, but does not
+     * automatically download an SCM DB snapshot.
+     */
+    LARGE_DRIFT_THRESHOLD_EXCEEDED
+  }
 
   ReconStorageContainerSyncHelper(StorageContainerServiceProvider 
scmServiceProvider,
                                   OzoneConfiguration ozoneConfiguration,
                                   ReconContainerManager containerManager) {
+    this(scmServiceProvider, ozoneConfiguration, containerManager, null);
+  }
+
+  ReconStorageContainerSyncHelper(StorageContainerServiceProvider 
scmServiceProvider,
+                                  OzoneConfiguration ozoneConfiguration,
+                                  ReconContainerManager containerManager,
+                                  ReconScmContainerSyncMetrics metrics) {
     this.scmServiceProvider = scmServiceProvider;
     this.ozoneConfiguration = ozoneConfiguration;
     this.containerManager = containerManager;
+    this.metrics = metrics;
   }
 
+  /**
+   * Decides what sync action the periodic scheduler should take based on the
+   * observed drift between SCM and Recon.
+   *
+   * <p>Decision logic:
+   * <ol>
+   *   <li>If {@code |(SCM_total - SCM_open) - (Recon_total - Recon_open)| >
+   *       ozone.recon.scm.container.threshold} (default 1,000,000): return
+   *       {@link SyncAction#LARGE_DRIFT_THRESHOLD_EXCEEDED}. Large drift in
+   *       non-OPEN containers is surfaced through logs and metrics instead of
+   *       triggering an automatic SCM DB snapshot replacement.</li>
+   *   <li>If total drift is positive but the non-OPEN drift is at or below the
+   *       threshold: return {@link SyncAction#TARGETED_SYNC}. This keeps large
+   *       OPEN-only gaps on the incremental path because missing OPEN
+   *       containers can be repaired cheaply without replacing the full SCM 
DB.</li>
+   *   <li>If total drift is zero, check per-state drift for OPEN, 
QUASI_CLOSED,
+   *       and CLOSED against {@code ozone.recon.scm.per.state.drift.threshold}
+   *       (default 5):
+   *       <ul>
+   *         <li><b>OPEN</b>: detects containers stuck OPEN in Recon after SCM
+   *             has advanced them to QUASI_CLOSED or CLOSED.</li>
+   *         <li><b>QUASI_CLOSED</b>: detects containers stuck QUASI_CLOSED in
+   *             Recon after SCM has advanced them to CLOSED.</li>
+   *         <li><b>CLOSED</b>: detects CLOSED count mismatch when total counts
+   *             are equal (e.g., OPEN and QUASI_CLOSED counts are also equal 
but
+   *             some containers are in the wrong bucket).</li>
+   *       </ul>
+   *       If drift in <em>any</em> checked state reaches the threshold:
+   *       return {@link SyncAction#TARGETED_SYNC}.</li>
+   *   <li>Otherwise: return {@link SyncAction#NO_ACTION}.</li>
+   * </ol>
+   *
+   * <p>Per-state drift deliberately routes to targeted sync, not a full
+   * snapshot — the targeted sync's per-state passes correct each condition
+   * efficiently without replacing the entire database.
+   *
+   * @return the recommended {@link SyncAction}
+   * @throws IOException if SCM RPC calls to retrieve counts fail
+   */
+  public SyncAction decideSyncAction() throws IOException {
+    int largeThreshold = ozoneConfiguration.getInt(
+        OZONE_RECON_SCM_CONTAINER_THRESHOLD,
+        OZONE_RECON_SCM_CONTAINER_THRESHOLD_DEFAULT);
+    int perStateDriftThreshold = ozoneConfiguration.getInt(
+        OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD,
+        OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD_DEFAULT);
+
+    // All per-state counts use O(1) getContainerStateCount() — no heap 
allocation,
+    // no container list loading. OPEN and QUASI_CLOSED are read here because 
they
+    // are needed for both the Check 1 non-OPEN drift and Check 2 per-state 
checks.
+    // reconClosed is read later in Check 2 where it is first needed.
+    // getTotalContainerCount() sums all LifeCycleState values via the same 
O(1)
+    // per-state lookups.
+    long reconOpen = 
containerManager.getContainerStateCount(HddsProtos.LifeCycleState.OPEN);
+    long reconQuasiClosed =
+        
containerManager.getContainerStateCount(HddsProtos.LifeCycleState.QUASI_CLOSED);
+    long reconTotal = containerManager.getTotalContainerCount();
+
+    // --- Check 1: large non-OPEN drift exceeds threshold ---
+    long scmTotal = scmServiceProvider.getContainerCount();
+    long scmOpen = 
scmServiceProvider.getContainerCount(HddsProtos.LifeCycleState.OPEN);
+    long totalDrift = Math.abs(scmTotal - reconTotal);
+    long scmNonOpen = Math.max(0, scmTotal - scmOpen);
+    long reconNonOpen = Math.max(0, reconTotal - reconOpen);
+    long nonOpenDrift = Math.abs(scmNonOpen - reconNonOpen);
+
+    if (nonOpenDrift > largeThreshold) {
+      LOG.warn("Tiered sync decision: LARGE_DRIFT_THRESHOLD_EXCEEDED. "
+              + "Non-OPEN container drift {} exceeds threshold {} "
+              + "(SCM_non_OPEN={}, Recon_non_OPEN={}, SCM_total={}, 
Recon_total={}). "
+              + "Periodic full SCM DB snapshot download is disabled; "
+              + "skipping automatic SCM checkpoint replacement and recording "
+              + "large-drift threshold-exceeded event. Check Recon metrics "
+              + "fullScmDbSnapshotThresholdExceededCount, "
+              + "lastFullScmDbSnapshotThresholdExceededNonOpenDrift, and "
+              + "intervalSinceLastFullScmDbSnapshotThresholdExceededMs.",
+          nonOpenDrift, largeThreshold, scmNonOpen, reconNonOpen, scmTotal, 
reconTotal);
+      if (metrics != null) {
+        metrics.recordFullSnapshotThresholdExceededEvent(nonOpenDrift);
+      }
+      return SyncAction.LARGE_DRIFT_THRESHOLD_EXCEEDED;
+    }
+    if (totalDrift > 0) {

Review Comment:
   this logic specify difference in container count, will result in sync, and 
perStateDrift > 1 also lead to sync, so can see mostly will sync in every 
condition.
   So overall this complicated logic is not required in this.
   DELETED : also counted and may cause always sync if there is some difference 
in this state every time, even after that system is stable.
   
   We can have simple decision if need sync or not like,
   1. Open - can do alway if drift > 0
   2. CLOSED / QUASI_CLOSED: either configured > 0 or min threshold like 1000 
as old



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java:
##########
@@ -184,17 +201,47 @@ public final class  ReconServerConfigKeys {
 
   public static final int 
OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY_DEFAULT = 2000;
 
+  public static final String OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY =

Review Comment:
   seems this config is removed, recheck



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java:
##########
@@ -550,77 +594,134 @@ private void initializeSCMDB() {
 
   public void updateReconSCMDBWithNewSnapshot() throws IOException {
     if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
-      DBCheckpoint dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
-      if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
-        LOG.info("Got new checkpoint from SCM : " +
-            dbSnapshot.getCheckpointLocation());
-        try {
-          initializeNewRdbStore(dbSnapshot.getCheckpointLocation().toFile());
-        } catch (IOException e) {
-          LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e);
+      try {
+        DBCheckpoint dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
+        if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
+          LOG.info("Got new checkpoint from SCM : " +
+              dbSnapshot.getCheckpointLocation());
+          try {
+            initializeNewRdbStore(dbSnapshot.getCheckpointLocation().toFile());
+          } catch (IOException e) {
+            LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e);
+          }
+        } else {
+          LOG.error("Null snapshot location got from SCM.");
         }
-      } else {
-        LOG.error("Null snapshot location got from SCM.");
+      } finally {
+        isSyncDataFromSCMRunning.compareAndSet(true, false);
       }
     } else {
       LOG.warn("SCM DB sync is already running.");
     }
   }
 
+  /**
+   * Runs the four-pass targeted sync unconditionally (all states: CLOSED,
+   * OPEN, QUASI_CLOSED, and DELETED). This method is the direct
+   * entry point for the REST trigger endpoint
+   * {@code POST /api/v1/triggerdbsync/scm} and for any caller that explicitly
+   * wants an incremental sync rather than a drift-evaluated decision.
+   *
+   * <p>For the periodic scheduler the tiered
+   * {@link ReconStorageContainerSyncHelper#decideSyncAction()} path is used
+   * instead, which may report large drift, run targeted sync, or skip work
+   * entirely depending on observed drift.
+   */
   public boolean syncWithSCMContainerInfo() {
     if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
-      return containerSyncHelper.syncWithSCMContainerInfo();
+      try {
+        return runTargetedSyncWithMetrics();
+      } finally {
+        isSyncDataFromSCMRunning.compareAndSet(true, false);
+      }
     } else {
       LOG.debug("SCM DB sync is already running.");
       return false;
     }
   }
 
-  private void deleteOldSCMDB() throws IOException {
-    if (dbStore != null) {
-      File oldDBLocation = dbStore.getDbLocation();
-      if (oldDBLocation.exists()) {
-        LOG.info("Cleaning up old SCM snapshot db at {}.",
-            oldDBLocation.getAbsolutePath());
-        FileUtils.deleteDirectory(oldDBLocation);
-      }
+  private boolean runTargetedSyncWithMetrics() {
+    long startTime = Time.monotonicNow();
+    containerSyncMetrics.setTargetedSyncStatus(
+        ReconScmContainerSyncMetrics.TARGETED_SYNC_STATUS_IN_PROGRESS);
+    try {
+      boolean success = containerSyncHelper.syncWithSCMContainerInfo();
+      containerSyncMetrics.setTargetedSyncStatus(success
+          ? ReconScmContainerSyncMetrics.TARGETED_SYNC_STATUS_SUCCESS
+          : ReconScmContainerSyncMetrics.TARGETED_SYNC_STATUS_FAILURE);
+      return success;
+    } catch (RuntimeException | Error e) {
+      containerSyncMetrics.setTargetedSyncStatus(
+          ReconScmContainerSyncMetrics.TARGETED_SYNC_STATUS_FAILURE);
+      throw e;
+    } finally {
+      containerSyncMetrics.setLastTargetedSyncDurationMs(
+          Time.monotonicNow() - startTime);
+    }
+  }
+
+  private void deleteSCMDB(File dbLocation) throws IOException {
+    if (dbLocation != null && dbLocation.exists()) {
+      LOG.info("Cleaning up old SCM snapshot db at {}.",
+          dbLocation.getAbsolutePath());
+      FileUtils.deleteDirectory(dbLocation);
     }
   }
 
   private void initializeNewRdbStore(File dbFile) throws IOException {
-    try {
-      final DBStore newStore = DBStoreBuilder.newBuilder(ozoneConfiguration, 
ReconSCMDBDefinition.get(), dbFile)
-          .build();
-      final Table<DatanodeID, DatanodeDetails> nodeTable = 
ReconSCMDBDefinition.NODES.getTable(dbStore);
-      final Table<DatanodeID, DatanodeDetails> newNodeTable = 
ReconSCMDBDefinition.NODES.getTable(newStore);
-      try (TableIterator<DatanodeID, ? extends KeyValue<DatanodeID, 
DatanodeDetails>> iterator = nodeTable.iterator()) {
+    final DBStore oldStore = dbStore;

Review Comment:
   This part of code belongs to another PR where we are supporting DB sync. And 
how this operation is atomic? like old db is deleted before initializing with 
new db? what happens during that time ?



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java:
##########
@@ -432,34 +437,72 @@ public void start() {
     } else {
       initializePipelinesFromScm();
     }
-    LOG.debug("Started the SCM Container Info sync scheduler.");
-    long interval = ozoneConfiguration.getTimeDuration(
-        OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DELAY,
-        OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
-    long initialDelay = ozoneConfiguration.getTimeDuration(
-        OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY,
-        OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
+    // -----------------------------------------------------------------------
+    // Scheduler (incremental/targeted sync): runs every 1h (default).

Review Comment:
   make comment in sync with default config, or make use of variable instead of 
absolute value



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java:
##########
@@ -550,77 +594,134 @@ private void initializeSCMDB() {
 
   public void updateReconSCMDBWithNewSnapshot() throws IOException {
     if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
-      DBCheckpoint dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
-      if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
-        LOG.info("Got new checkpoint from SCM : " +
-            dbSnapshot.getCheckpointLocation());
-        try {
-          initializeNewRdbStore(dbSnapshot.getCheckpointLocation().toFile());
-        } catch (IOException e) {
-          LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e);
+      try {
+        DBCheckpoint dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
+        if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
+          LOG.info("Got new checkpoint from SCM : " +
+              dbSnapshot.getCheckpointLocation());
+          try {
+            initializeNewRdbStore(dbSnapshot.getCheckpointLocation().toFile());
+          } catch (IOException e) {
+            LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e);
+          }
+        } else {
+          LOG.error("Null snapshot location got from SCM.");
         }
-      } else {
-        LOG.error("Null snapshot location got from SCM.");
+      } finally {
+        isSyncDataFromSCMRunning.compareAndSet(true, false);
       }
     } else {
       LOG.warn("SCM DB sync is already running.");
     }
   }
 
+  /**
+   * Runs the four-pass targeted sync unconditionally (all states: CLOSED,
+   * OPEN, QUASI_CLOSED, and DELETED). This method is the direct
+   * entry point for the REST trigger endpoint
+   * {@code POST /api/v1/triggerdbsync/scm} and for any caller that explicitly
+   * wants an incremental sync rather than a drift-evaluated decision.
+   *
+   * <p>For the periodic scheduler the tiered
+   * {@link ReconStorageContainerSyncHelper#decideSyncAction()} path is used
+   * instead, which may report large drift, run targeted sync, or skip work
+   * entirely depending on observed drift.
+   */
   public boolean syncWithSCMContainerInfo() {

Review Comment:
   Do still need syncWithSCMContainerInfo() here ? as code is moved to 
targetedSyncHelper



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to