sumitagrawl commented on code in PR #10074:
URL: https://github.com/apache/ozone/pull/10074#discussion_r3258249383


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconScmContainerSyncMetrics.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.metrics;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.Time;
+
+/**
+ * Metrics for Recon SCM container sync decisions and targeted sync execution.
+ */
[email protected]
+@Metrics(about = "Recon SCM Container Sync Metrics", context = 
OzoneConsts.OZONE)
+public final class ReconScmContainerSyncMetrics {
+
+  private static final String SOURCE_NAME =
+      ReconScmContainerSyncMetrics.class.getSimpleName();
+
+  /**
+   * No targeted sync has run yet, or the latest scheduler cycle did not run 
one.
+   */
+  public static final int TARGETED_SYNC_STATUS_IDLE = 0;
+  /**
+   * Targeted sync is currently running.
+   */
+  public static final int TARGETED_SYNC_STATUS_IN_PROGRESS = 1;
+  /**
+   * The last targeted sync completed successfully.
+   */
+  public static final int TARGETED_SYNC_STATUS_SUCCESS = 2;
+  /**
+   * The last targeted sync completed with one or more failed passes.
+   */
+  public static final int TARGETED_SYNC_STATUS_FAILURE = 3;
+
+  @Metric(about = "Count of events where non-OPEN container drift exceeded "
+      + "the full SCM DB snapshot threshold")
+  private MutableCounterLong fullScmDbSnapshotThresholdExceededCount;
+
+  @Metric(about = "Last non-OPEN container drift observed when the full SCM "
+      + "DB snapshot threshold was exceeded")
+  private MutableGaugeLong lastFullScmDbSnapshotThresholdExceededNonOpenDrift;
+
+  @Metric(about = "Time between the last two full SCM DB snapshot threshold "
+      + "exceeded events in milliseconds")
+  private MutableGaugeLong 
intervalSinceLastFullScmDbSnapshotThresholdExceededMs;

Review Comment:
   we can remove full scm downlaod related metrics



##########
hadoop-hdds/common/src/main/resources/ozone-default.xml:
##########
@@ -4608,20 +4604,32 @@
   </property>
 
   <property>
-    <name>ozone.recon.scm.snapshot.task.initial.delay</name>
-    <value>1m</value>
-    <tag>OZONE, MANAGEMENT, RECON</tag>
+    <name>ozone.recon.scm.container.sync.task.initial.delay</name>
+    <value>2m</value>
+    <tag>OZONE, MANAGEMENT, RECON, SCM</tag>
     <description>
-      Initial delay in MINUTES by Recon to request SCM DB Snapshot.
+      Initial delay before Recon starts the incremental SCM container sync 
task.
+      This gives Recon startup enough time to initialize the SCM DB before the
+      first incremental sync runs.
     </description>
   </property>
-
   <property>
-    <name>ozone.recon.scm.snapshot.task.interval.delay</name>
-    <value>24h</value>
-    <tag>OZONE, MANAGEMENT, RECON</tag>
+    <name>ozone.recon.scm.container.sync.task.interval.delay</name>
+    <value>6h</value>
+    <tag>OZONE, MANAGEMENT, RECON, SCM</tag>
+    <description>
+      Interval between incremental SCM container sync runs in Recon. Each cycle
+      evaluates drift between SCM and Recon and either runs the targeted
+      multi-pass sync or takes no action.
+    </description>
+  </property>
+  <property>
+    <name>ozone.recon.scm.deleted.container.check.batch.size</name>
+    <value>500</value>

Review Comment:
   code have 1 million, but conf have 500, to be same



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java:
##########
@@ -179,33 +185,175 @@ public void checkAndAddNewContainerBatch(
   }
 
   /**
-   *  Check if container state is not open. In SCM, container state
-   *  changes to CLOSING first, and then the close command is pushed down
-   *  to Datanodes. Recon 'learns' this from DN, and hence replica state
-   *  will move container state to 'CLOSING'.
+   * Returns the total number of containers across all lifecycle states.
    *
-   * @param containerID containerID to check
-   * @param state  state to be compared
+   * <p>This mirrors SCM's {@code ContainerManager#getTotalContainerCount()}
+   * implementation: it sums per-state counts, where each lookup is O(1),
+   * instead of loading container records.
+   *
+   * @return total container count
    */
+  @Override
+  public long getTotalContainerCount() {
+    long total = 0;
+    for (HddsProtos.LifeCycleState state : HddsProtos.LifeCycleState.values()) 
{
+      total += getContainerStateCount(state);
+    }
+    return total;
+  }
 
-  private void checkContainerStateAndUpdate(ContainerID containerID,
-                                            ContainerReplicaProto.State state)
-          throws IOException, InvalidStateTransitionException {
-    ContainerInfo containerInfo = getContainer(containerID);
-    if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN)
-        && !state.equals(ContainerReplicaProto.State.OPEN)
-        && isHealthy(state)) {
-      LOG.info("Container {} has state OPEN, but given state is {}.",
-          containerID, state);
-      final PipelineID pipelineID = containerInfo.getPipelineID();
-      // subtract open container count from the map
+  /**
+   * Transitions a container from OPEN to CLOSING, keeping the per-pipeline
+   * open-container count in {@link #pipelineToOpenContainer} accurate.
+   *
+   * <p>Must be called whenever an OPEN container is moved to CLOSING so that
+   * the pipeline's open-container count stays consistent.  Both the DN-report
+   * driven path ({@link #checkContainerStateAndUpdate}) and the periodic sync
+   * passes ({@code processSyncedClosedContainer}, {@code 
syncQuasiClosedContainers})
+   * use this method to avoid divergence in the count exposed to the Recon 
Node API.
+   *
+   * <p>If the container was recorded without a pipeline (null pipeline at
+   * {@code addNewContainer} time) the count decrement is safely skipped.
+   *
+   * @param containerID   container to advance from OPEN to CLOSING
+   * @param containerInfo already-fetched {@code ContainerInfo} for the 
container
+   *                      (avoids a redundant lookup inside this method)
+   * @throws IOException                     if the state update fails
+   * @throws InvalidStateTransitionException if the container is not in OPEN 
state
+   */
+  void transitionOpenToClosing(ContainerID containerID, ContainerInfo 
containerInfo)
+      throws IOException, InvalidStateTransitionException {
+    PipelineID pipelineID = containerInfo.getPipelineID();
+    if (pipelineID != null) {
       int curCnt = pipelineToOpenContainer.getOrDefault(pipelineID, 0);
       if (curCnt == 1) {
         pipelineToOpenContainer.remove(pipelineID);
       } else if (curCnt > 0) {
         pipelineToOpenContainer.put(pipelineID, curCnt - 1);
       }
-      updateContainerState(containerID, FINALIZE);
+    }
+    updateContainerState(containerID, FINALIZE);  // OPEN → CLOSING
+  }
+
+  /**
+   * Check if container state needs to advance based on a DN replica report and
+   * SCM's authoritative lifecycle state.
+   *
+   * <p>Two scenarios handled:
+   * <ol>
+   *   <li>OPEN in Recon + non-OPEN healthy replica → FINALIZE (OPEN→CLOSING),
+   *       then query SCM to advance further if possible.</li>
+   *   <li>CLOSING in Recon + any report → query SCM to advance to
+   *       QUASI_CLOSED or CLOSED if SCM has already moved there.</li>
+   *   <li>DELETED in Recon + live replica report → rehydrate the container 
from
+   *       SCM if SCM still records it in a live state such as QUASI_CLOSED or
+   *       CLOSED.</li>
+   * </ol>
+   *
+   * <p>Querying SCM for the authoritative state prevents containers from 
getting
+   * permanently stuck at CLOSING when the DN report that would normally
+   * trigger the next transition was missed (e.g., Recon downtime).
+   *
+   * @param containerID containerID to check
+   * @param replicaState replica state reported by DataNode
+   */
+  private void checkContainerStateAndUpdate(ContainerID containerID,
+                                            ContainerReplicaProto.State 
replicaState)
+      throws IOException, InvalidStateTransitionException {
+    ContainerInfo containerInfo = getContainer(containerID);

Review Comment:
   for fixing moving container from deleted to closed or other handling from 
ICR/FCR, can be done in different JIRA



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java:
##########
@@ -19,87 +19,881 @@
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLEANUP;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLOSE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.DELETE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.FORCE_CLOSE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.QUASI_CLOSE;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD_DEFAULT;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.recon.metrics.ReconScmContainerSyncMetrics;
 import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Helper class that performs targeted incremental sync between SCM and Recon
+ * container metadata. Each sync cycle scans the SCM states Recon can safely
+ * reconcile (OPEN, QUASI_CLOSED, CLOSED and DELETED), all completing in a
+ * single cycle with local pagination. SCM CLOSING and DELETING are skipped
+ * deliberately because they are intermediate states.
+ *
+ * <ol>
+ *   <li><b>OPEN:</b> scans only newly created OPEN containers starting from 
the
+ *       last-seen ID ({@code pass2OpenStartContainerId}). Existing containers 
in
+ *       later Recon states are not moved backwards to OPEN.</li>
+ *   <li><b>QUASI_CLOSED and CLOSED:</b> paginate SCM state lists; add absent
+ *       containers and advance existing Recon containers through valid local
+ *       state-machine transitions. If Recon has DELETED but SCM reports one of
+ *       these states, Recon rebuilds the container record from SCM 
metadata.</li>
+ *   <li><b>DELETED:</b> paginates SCM's DELETED list using
+ *       {@code getListOfContainerInfos}. For each container SCM reports as
+ *       DELETED, Recon drives the container to DELETED in a single call. The
+ *       DELETING list is intentionally skipped to avoid leaving Recon in an
+ *       intermediate DELETING state across cycles.</li>
+ * </ol>
+ *
+ * <h3>Scalability at 100M containers</h3>
+ * <ul>
+   *   <li>{@link #decideSyncAction()} sums per-state counts (O(number of
+   *       states), all O(1) lookups) — never calls {@code getContainers()}
+   *       which would allocate a large
+ *       {@code List<ContainerInfo>} on every decision tick.</li>
+ *   <li>Live-state sync issues <b>one</b>
+ *       {@code getExistContainerWithPipelinesInBatch} RPC per sub-batch of
+ *       absent containers — not one per absent container.
+ *       Sub-batch size is bounded by {@link 
#safeContainerWithPipelineBatchSize}
+ *       to keep the CWP response within the 128 MB IPC limit.</li>
+ *   <li>DELETED sync uses {@code getListOfContainerInfos} against SCM's
+ *       DELETED list only, bounded by {@link 
#safeContainerInfoBatchSize(int)}.
+ *       </li>
+ * </ul>
+ */
 class ReconStorageContainerSyncHelper {
 
-  // Serialized size of one ContainerID proto on the wire (varint tag + 8-byte 
long = ~12 bytes).
-  // Used to derive the maximum batch size that fits within 
ipc.maximum.data.length.
+  /**
+   * Wire size of one {@code ContainerID} proto (varint tag + 8-byte long ≈ 12 
bytes).
+   * Used to compute the maximum number of IDs that fit in one
+   * {@code getListOfContainerIDs} RPC call, where both the request (IDs sent
+   * to SCM) and the response (IDs returned by SCM) carry only ContainerID 
entries.
+   * Applies to live-state pagination and DELETED ID lists
+   * (DELETED ID list).
+   */
   private static final long CONTAINER_ID_PROTO_SIZE_BYTES = 12;
 
+  /**
+   * Conservative wire-size upper bound for one {@code ContainerInfo} proto
+   * response entry (used by {@code getListOfContainerInfos}).
+   *
+   * <p>ContainerInfo carries only container metadata — no pipeline, no
+   * DatanodeDetails. Measured estimate: containerID(8) + state(2) +
+   * usedBytes(8) + numberOfKeys(8) + owner(~20) + replicationType(2) +
+   * replicationFactor(2) + stateEnterTime(8) + sequenceId(8) +
+   * pipelineID(~20) ≈ 86 bytes. This constant uses <b>128 bytes</b>
+   * (~1.5× measured) as a safety margin.
+   *
+   * <p>Safe max batch for {@code getListOfContainerInfos} at 128 MB IPC limit:
+   * <pre>
+   *   128 MB / 128 bytes = 1,048,576 containers per call
+   *   (actual bytes: 1,048,576 × 86 ≈ 86 MB — well within limit)
+   * </pre>
+   *
+   * <p>This is ~8× larger than {@link 
#CONTAINER_WITH_PIPELINE_PROTO_SIZE_BYTES}
+   * because ContainerInfo does not include the Pipeline and DatanodeDetails
+   * entries that dominate the ContainerWithPipeline size.
+   */
+  private static final long CONTAINER_INFO_PROTO_SIZE_BYTES = 128;
+
+  /**
+   * Conservative wire-size upper bound for one {@code ContainerWithPipeline}
+   * proto response entry.
+   *
+   * <p>Measured estimate: ContainerInfoProto ~120 bytes + PipelineProto with 3
+   * DatanodeDetailsProto entries ~370 bytes ≈ 490 bytes. This constant uses
+   * <b>1024 bytes</b> — approximately 2× the measured value — to provide a
+   * comfortable safety margin against larger deployments where hostnames,
+   * certificates, or additional port entries grow the proto beyond the 
estimate.
+   *
+   * <p>This constant is used exclusively to bound the <em>response</em> of
+   * {@code getExistContainerWithPipelinesInBatch}. The <em>request</em> 
carries
+   * only container IDs and is bounded by {@link 
#CONTAINER_ID_PROTO_SIZE_BYTES}.
+   * The two constants are different because the request and response payloads
+   * have vastly different sizes (12 bytes vs ~490 bytes per entry).
+   *
+   * <h3>Safe batch limits at the 128 MB default IPC ceiling</h3>
+   * <p>{@code IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 134,217,728 bytes = 128 MB}
+   * (verified from Hadoop 3.x {@code CommonConfigurationKeys}).
+   * <pre>
+   *   Single-state CWP call (absent-container adds):
+   *     128 MB / 1024 bytes = 131,072 containers per call
+   *     (actual bytes: 131,072 × 490 ≈ 61 MB — well within limit)
+   * </pre>
+   *
+   * @see #safeContainerWithPipelineBatchSize(int)
+   */
+  private static final long CONTAINER_WITH_PIPELINE_PROTO_SIZE_BYTES = 1024;
+
+  /**
+   * Monotonic cursor for OPEN add-only sync. OPEN containers are
+   * created with increasing container IDs, so each cycle only needs to scan
+   * from the last-seen ID onward rather than rescanning the full OPEN set.
+   *
+   * <p>{@link AtomicLong} rather than {@code volatile long}: provides the same
+   * visibility guarantee but expresses concurrent intent explicitly through 
the
+   * type, following standard Java concurrency conventions. The CAS mutex in
+   * {@link ReconStorageContainerManagerFacade} ensures a single writer, so
+   * compound-atomic operations ({@code compareAndSet}, {@code getAndAdd}) are
+   * not needed — only {@code get()} and {@code set()} are used.
+   */
+  private final AtomicLong pass2OpenStartContainerId = new AtomicLong(1L);
+
   private static final Logger LOG = LoggerFactory
       .getLogger(ReconStorageContainerSyncHelper.class);
 
   private final StorageContainerServiceProvider scmServiceProvider;
   private final OzoneConfiguration ozoneConfiguration;
   private final ReconContainerManager containerManager;
+  private final ReconScmContainerSyncMetrics metrics;
+
+  /**
+   * Describes the action that the periodic scheduler should take based on the
+   * observed drift between SCM and Recon container metadata.
+   */
+  public enum SyncAction {
+    /**
+     * No drift detected — no sync work needed this cycle.
+     */
+    NO_ACTION,
+
+    /**
+     * Drift detected — run the targeted sync.
+     * This is the normal steady-state response: cheaper than a full snapshot
+     * and sufficient for the vast majority of drift scenarios.
+     */
+    TARGETED_SYNC
+  }
 
   ReconStorageContainerSyncHelper(StorageContainerServiceProvider 
scmServiceProvider,
                                   OzoneConfiguration ozoneConfiguration,
                                   ReconContainerManager containerManager) {
+    this(scmServiceProvider, ozoneConfiguration, containerManager, null);
+  }
+
+  ReconStorageContainerSyncHelper(StorageContainerServiceProvider 
scmServiceProvider,
+                                  OzoneConfiguration ozoneConfiguration,
+                                  ReconContainerManager containerManager,
+                                  ReconScmContainerSyncMetrics metrics) {
     this.scmServiceProvider = scmServiceProvider;
     this.ozoneConfiguration = ozoneConfiguration;
     this.containerManager = containerManager;
+    this.metrics = metrics;
   }
 
+  /**
+   * Decides what sync action the periodic scheduler should take based on the
+   * observed drift between SCM and Recon.
+   *
+   * <p>Decision logic:
+   * <ol>
+   *   <li>If {@code |(SCM_total - SCM_open) - (Recon_total - Recon_open)| >
+   *       ozone.recon.scm.container.threshold} (default 1,000,000): record the
+   *       large-drift event through logs and metrics, then return
+   *       {@link SyncAction#TARGETED_SYNC}.</li>
+   *   <li>If OPEN drift is positive: return
+   *       {@link SyncAction#TARGETED_SYNC}. This keeps OPEN-only gaps on the
+   *       incremental path because missing OPEN containers can be repaired
+   *       cheaply without replacing the full SCM DB.</li>
+   *   <li>Check per-state drift for QUASI_CLOSED and CLOSED against
+   *       {@code ozone.recon.scm.per.state.drift.threshold} (default 1):
+   *       <ul>
+   *         <li><b>QUASI_CLOSED</b>: detects containers stuck QUASI_CLOSED in
+   *             Recon after SCM has advanced them to CLOSED.</li>
+   *         <li><b>CLOSED</b>: detects CLOSED count mismatch when total counts
+   *             are equal (e.g., OPEN and QUASI_CLOSED counts are also equal 
but
+   *             some containers are in the wrong bucket).</li>
+   *       </ul>
+   *       If drift in either stable state reaches the threshold:
+   *       return {@link SyncAction#TARGETED_SYNC}.</li>
+   *   <li>DELETED-only total-count drift does not trigger targeted sync.
+   *       DELETED sync can converge SCM-reported DELETED containers into 
Recon,
+   *       but it cannot remove extra DELETED containers that SCM no longer
+   *       lists.</li>
+   *   <li>Otherwise: return {@link SyncAction#NO_ACTION}.</li>
+   * </ol>
+   *
+   * <p>Repairable per-state drift deliberately routes to targeted sync, not a
+   * full snapshot — targeted sync corrects these conditions efficiently 
without
+   * replacing the entire database.
+   *
+   * @return the recommended {@link SyncAction}
+   * @throws IOException if SCM RPC calls to retrieve counts fail
+   */
+  public SyncAction decideSyncAction() throws IOException {
+    int largeThreshold = ozoneConfiguration.getInt(
+        OZONE_RECON_SCM_CONTAINER_THRESHOLD,
+        OZONE_RECON_SCM_CONTAINER_THRESHOLD_DEFAULT);
+    int perStateDriftThreshold = Math.max(1, ozoneConfiguration.getInt(
+        OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD,
+        OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD_DEFAULT));
+
+    // All per-state counts use O(1) getContainerStateCount() — no heap 
allocation,
+    // no container list loading. OPEN is read here because it is needed for 
both
+    // the Check 1 non-OPEN drift and Check 2 repairable per-state drift 
checks.
+    // QUASI_CLOSED and CLOSED are read later in Check 3 where they are first
+    // needed.
+    long reconOpen =
+        
containerManager.getContainerStateCount(HddsProtos.LifeCycleState.OPEN);
+    long reconTotal = containerManager.getTotalContainerCount();
+
+    // --- Check 1: large non-OPEN drift exceeds threshold ---
+    long scmTotal = scmServiceProvider.getContainerCount();
+    long scmOpen = 
scmServiceProvider.getContainerCount(HddsProtos.LifeCycleState.OPEN);
+    long totalDrift = Math.abs(scmTotal - reconTotal);
+    long scmNonOpen = Math.max(0, scmTotal - scmOpen);
+    long reconNonOpen = Math.max(0, reconTotal - reconOpen);
+    long nonOpenDrift = Math.abs(scmNonOpen - reconNonOpen);
+
+    if (nonOpenDrift > largeThreshold) {
+      LOG.warn("Tiered sync decision: TARGETED_SYNC. "
+              + "Non-OPEN container drift {} exceeds threshold {} "
+              + "(SCM_non_OPEN={}, Recon_non_OPEN={}, SCM_total={}, 
Recon_total={}). "
+              + "Recording large-drift threshold-exceeded event and running "
+              + "targeted sync. Check Recon metrics "
+              + "fullScmDbSnapshotThresholdExceededCount, "
+              + "lastFullScmDbSnapshotThresholdExceededNonOpenDrift, and "
+              + "intervalSinceLastFullScmDbSnapshotThresholdExceededMs.",
+          nonOpenDrift, largeThreshold, scmNonOpen, reconNonOpen, scmTotal, 
reconTotal);
+      if (metrics != null) {
+        metrics.recordFullSnapshotThresholdExceededEvent(nonOpenDrift);
+      }
+      return SyncAction.TARGETED_SYNC;
+    }
+    // --- Check 2: OPEN drift is always repairable through targeted sync. ---
+    long openDrift = Math.abs(scmOpen - reconOpen);
+    if (openDrift > 0) {
+      LOG.info("OPEN container drift {} detected (SCM_OPEN={}, Recon_OPEN={}). 
"
+              + "Using targeted sync.",
+          openDrift, scmOpen, reconOpen);
+      if (metrics != null) {
+        recordPerStateDriftMetric(HddsProtos.LifeCycleState.OPEN, openDrift);
+      }
+      return SyncAction.TARGETED_SYNC;
+    }
+
+    // --- Check 3: stable-state drift that targeted sync can repair. ---
+    //
+    // Counts are read directly via O(1) per-state lookups — no
+    // derivation or subtraction. Each is an exact count from the per-state
+    // NavigableMap on Recon and from SCM's getContainerStateCount on SCM.
+    long scmQuasiClosed =
+        
scmServiceProvider.getContainerCount(HddsProtos.LifeCycleState.QUASI_CLOSED);
+    long scmClosed =
+        scmServiceProvider.getContainerCount(HddsProtos.LifeCycleState.CLOSED);
+    long reconQuasiClosed =
+        
containerManager.getContainerStateCount(HddsProtos.LifeCycleState.QUASI_CLOSED);
+    long reconClosed =
+        
containerManager.getContainerStateCount(HddsProtos.LifeCycleState.CLOSED);
+
+    for (Object[] entry : new Object[][]{
+        {HddsProtos.LifeCycleState.QUASI_CLOSED, scmQuasiClosed, 
reconQuasiClosed},
+        {HddsProtos.LifeCycleState.CLOSED, scmClosed, reconClosed}}) {
+      HddsProtos.LifeCycleState state = (HddsProtos.LifeCycleState) entry[0];
+      long scmCount = (long) entry[1];
+      long reconCount = (long) entry[2];
+      long drift = Math.abs(scmCount - reconCount);
+      if (drift >= perStateDriftThreshold) {
+        LOG.info("Per-state {} drift {} detected (SCM_{}={}, Recon_{}={}, 
threshold={}). "
+                + "Targeted sync will correct repairable state drift.",
+            state, drift, state, scmCount, state, reconCount, 
perStateDriftThreshold);
+        if (metrics != null) {
+          recordPerStateDriftMetric(state, drift);
+        }
+        return SyncAction.TARGETED_SYNC;
+      }
+    }
+
+    LOG.info("No repairable drift detected (total drift={}, non-OPEN 
drift={}). "
+            + "No sync needed.",
+        totalDrift, nonOpenDrift);
+    return SyncAction.NO_ACTION;
+  }
+
+  private void recordPerStateDriftMetric(HddsProtos.LifeCycleState state,
+                                         long drift) {
+    switch (state) {
+    case OPEN:
+      metrics.recordOpenContainerDrift(drift);
+      break;
+    case QUASI_CLOSED:
+      metrics.recordQuasiClosedContainerDrift(drift);
+      break;
+    case CLOSED:
+      metrics.recordClosedContainerDrift(drift);
+      break;
+    default:
+      break;
+    }
+  }
+
+  /**
+   * Runs targeted sync for SCM states Recon can safely reconcile.
+   */
   public boolean syncWithSCMContainerInfo() {
+    boolean open = syncContainersForState(HddsProtos.LifeCycleState.OPEN, 
true);
+    boolean quasiClosed =
+        syncContainersForState(HddsProtos.LifeCycleState.QUASI_CLOSED, false);
+    boolean closed =
+        syncContainersForState(HddsProtos.LifeCycleState.CLOSED, false);
+    boolean deleted = syncDeletedContainers();
+    return open && quasiClosed && closed && deleted;
+  }
+
+  /**
+   * Paginates one SCM lifecycle state and reconciles each returned container 
ID.
+   */
+  private boolean syncContainersForState(HddsProtos.LifeCycleState scmState,
+                                         boolean incrementalOpen) {
     try {
-      long totalContainerCount = scmServiceProvider.getContainerCount(
-          HddsProtos.LifeCycleState.CLOSED);
-      long containerCountPerCall =
-          getContainerCountPerCall(totalContainerCount);
-      ContainerID startContainerId = ContainerID.valueOf(1);
-      long retrievedContainerCount = 0;
-      if (totalContainerCount > 0) {
-        while (retrievedContainerCount < totalContainerCount) {
-          List<ContainerID> listOfContainers = scmServiceProvider.
-              getListOfContainerIDs(startContainerId,
-                  Long.valueOf(containerCountPerCall).intValue(),
-                  HddsProtos.LifeCycleState.CLOSED);
-          if (null != listOfContainers && !listOfContainers.isEmpty()) {
-            LOG.info("Got list of containers from SCM : {}", 
listOfContainers.size());
-            listOfContainers.forEach(containerID -> {
-              boolean isContainerPresentAtRecon = 
containerManager.containerExist(containerID);
-              if (!isContainerPresentAtRecon) {
-                try {
-                  ContainerWithPipeline containerWithPipeline =
-                      scmServiceProvider.getContainerWithPipeline(
-                          containerID.getId());
-                  containerManager.addNewContainer(containerWithPipeline);
-                } catch (IOException e) {
-                  LOG.error("Could not get container with pipeline " +
-                      "for container : {}", containerID);
-                }
-              }
-            });
-            long lastID = listOfContainers.get(listOfContainers.size() - 
1).getId();
-            startContainerId = ContainerID.valueOf(lastID + 1);
+      long total = scmServiceProvider.getContainerCount(scmState);

Review Comment:
   we can avoid 2 scm call to perform decision and action to sync



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to