rakeshadr commented on code in PR #10074:
URL: https://github.com/apache/ozone/pull/10074#discussion_r3264070541


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconScmContainerSyncMetrics.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.metrics;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+/**
+ * Metrics for Recon SCM targeted sync execution.
+ */
[email protected]
+@Metrics(about = "Recon SCM Container Sync Metrics", context = 
OzoneConsts.OZONE)
+public final class ReconScmContainerSyncMetrics {

Review Comment:
   I saw these metrics has been removed. It seems I was not communicated 
properly, we need to remove the snapshot related metrics but these are targeted 
sync metrics. Could you please add it back. Thanks!
   
    ```
    @Metric(about = "Last OPEN container drift that triggered targeted sync")
     private MutableGaugeLong lastOpenContainerDrift;
   
     @Metric(about = "Last QUASI_CLOSED container drift that triggered targeted 
sync")
     private MutableGaugeLong lastQuasiClosedContainerDrift;
   
     @Metric(about = "Last CLOSED container drift that triggered targeted sync")
     private MutableGaugeLong lastClosedContainerDrift;
   ```
   
   



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java:
##########
@@ -19,86 +19,731 @@
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLEANUP;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLOSE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.DELETE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.FORCE_CLOSE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.QUASI_CLOSE;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD_DEFAULT;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.recon.metrics.ReconScmContainerSyncMetrics;
 import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Helper class that performs targeted incremental sync between SCM and Recon
+ * container metadata. Executes four passes per sync cycle:
+ *
+ * <ol>
+ *   <li><b>Pass 1 — CLOSED (SCM-driven, add + correct):</b> fetches SCM's
+ *       CLOSED container ID list, adds any absent from Recon, and corrects
+ *       containers that are OPEN or CLOSING in Recon but CLOSED in SCM.</li>
+ *   <li><b>Pass 2 — OPEN (SCM-driven, add only):</b> adds OPEN containers
+ *       that are absent from Recon entirely (e.g., created while Recon was
+ *       down).</li>
+ *   <li><b>Pass 3 — QUASI_CLOSED (SCM-driven, add only):</b> adds
+ *       QUASI_CLOSED containers absent from Recon. Requires that SCM returns
+ *       container metadata with a null pipeline when pipeline lookup fails, 
and
+ *       that Recon's {@code addNewContainer} handles a null pipeline 
gracefully;
+ *       otherwise QUASI_CLOSED containers whose pipelines have been cleaned up
+ *       will fail with {@code NullPointerException} or {@code 
IOException}.</li>
+ *   <li><b>Pass 4 — DELETED retirement (Recon-driven, transition only):</b>
+ *       scans Recon's CLOSED and QUASI_CLOSED containers in batches, queries
+ *       SCM for each, and transitions any that SCM reports as DELETED.
+ *       Intentionally Recon-driven (not SCM-driven) because SCM's DELETED
+ *       list grows unboundedly; starting from Recon's bounded set of
+ *       non-terminal containers is always more efficient.</li>
+ * </ol>
+ */
 class ReconStorageContainerSyncHelper {
 
   // Serialized size of one ContainerID proto on the wire (varint tag + 8-byte 
long = ~12 bytes).
   // Used to derive the maximum batch size that fits within 
ipc.maximum.data.length.
   private static final long CONTAINER_ID_PROTO_SIZE_BYTES = 12;
 
+  /**
+   * Rotating cursor for Pass 4 (DELETED retirement). Tracks the list position
+   * where the next sync cycle should begin so that all candidates are
+   * eventually covered regardless of batch size. Volatile because it is
+   * updated by the scheduler thread and read by tests.
+   */
+  private volatile int pass4BatchOffset = 0;
+  /**
+   * Monotonic cursor for Pass 2 (OPEN add-only sync). OPEN containers are
+   * created with increasing container IDs, so each cycle only needs to scan
+   * from the last-seen ID onward rather than rescanning the full OPEN set.
+   */
+  private volatile long pass2OpenStartContainerId = 1L;
+
   private static final Logger LOG = LoggerFactory
       .getLogger(ReconStorageContainerSyncHelper.class);
 
   private final StorageContainerServiceProvider scmServiceProvider;
   private final OzoneConfiguration ozoneConfiguration;
   private final ReconContainerManager containerManager;
+  private final ReconScmContainerSyncMetrics metrics;
+
+  /**
+   * Describes the action that the periodic scheduler should take based on the
+   * observed drift between SCM and Recon container metadata.
+   */
+  public enum SyncAction {
+    /**
+     * No drift detected — no sync work needed this cycle.
+     */
+    NO_ACTION,
+
+    /**
+     * Small or per-state drift detected — run the four-pass targeted sync.
+     * This is the normal steady-state response: cheaper than a full snapshot
+     * and sufficient for the vast majority of drift scenarios.
+     */
+    TARGETED_SYNC,
+
+    /**
+     * Large total-count drift detected — replace Recon's entire SCM DB with a

Review Comment:
   I could see the metrics fully removed in the new commits. Please add these 
metrics with proper naming 'recordLargeDriftThresholdExceededEvent' like you 
mentioned in your above comment.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java:
##########
@@ -19,86 +19,731 @@
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLEANUP;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLOSE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.DELETE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.FORCE_CLOSE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.QUASI_CLOSE;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD_DEFAULT;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.recon.metrics.ReconScmContainerSyncMetrics;
 import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Helper class that performs targeted incremental sync between SCM and Recon
+ * container metadata. Executes four passes per sync cycle:
+ *
+ * <ol>
+ *   <li><b>Pass 1 — CLOSED (SCM-driven, add + correct):</b> fetches SCM's
+ *       CLOSED container ID list, adds any absent from Recon, and corrects
+ *       containers that are OPEN or CLOSING in Recon but CLOSED in SCM.</li>
+ *   <li><b>Pass 2 — OPEN (SCM-driven, add only):</b> adds OPEN containers
+ *       that are absent from Recon entirely (e.g., created while Recon was
+ *       down).</li>
+ *   <li><b>Pass 3 — QUASI_CLOSED (SCM-driven, add only):</b> adds
+ *       QUASI_CLOSED containers absent from Recon. Requires that SCM returns
+ *       container metadata with a null pipeline when pipeline lookup fails, 
and
+ *       that Recon's {@code addNewContainer} handles a null pipeline 
gracefully;
+ *       otherwise QUASI_CLOSED containers whose pipelines have been cleaned up
+ *       will fail with {@code NullPointerException} or {@code 
IOException}.</li>
+ *   <li><b>Pass 4 — DELETED retirement (Recon-driven, transition only):</b>
+ *       scans Recon's CLOSED and QUASI_CLOSED containers in batches, queries
+ *       SCM for each, and transitions any that SCM reports as DELETED.
+ *       Intentionally Recon-driven (not SCM-driven) because SCM's DELETED
+ *       list grows unboundedly; starting from Recon's bounded set of
+ *       non-terminal containers is always more efficient.</li>
+ * </ol>
+ */
 class ReconStorageContainerSyncHelper {
 
   // Serialized size of one ContainerID proto on the wire (varint tag + 8-byte 
long = ~12 bytes).
   // Used to derive the maximum batch size that fits within 
ipc.maximum.data.length.
   private static final long CONTAINER_ID_PROTO_SIZE_BYTES = 12;
 
+  /**
+   * Rotating cursor for Pass 4 (DELETED retirement). Tracks the list position
+   * where the next sync cycle should begin so that all candidates are
+   * eventually covered regardless of batch size. Volatile because it is
+   * updated by the scheduler thread and read by tests.
+   */
+  private volatile int pass4BatchOffset = 0;
+  /**
+   * Monotonic cursor for Pass 2 (OPEN add-only sync). OPEN containers are
+   * created with increasing container IDs, so each cycle only needs to scan
+   * from the last-seen ID onward rather than rescanning the full OPEN set.
+   */
+  private volatile long pass2OpenStartContainerId = 1L;
+
   private static final Logger LOG = LoggerFactory
       .getLogger(ReconStorageContainerSyncHelper.class);
 
   private final StorageContainerServiceProvider scmServiceProvider;
   private final OzoneConfiguration ozoneConfiguration;
   private final ReconContainerManager containerManager;
+  private final ReconScmContainerSyncMetrics metrics;
+
+  /**
+   * Describes the action that the periodic scheduler should take based on the
+   * observed drift between SCM and Recon container metadata.
+   */
+  public enum SyncAction {
+    /**
+     * No drift detected — no sync work needed this cycle.
+     */
+    NO_ACTION,
+
+    /**
+     * Small or per-state drift detected — run the four-pass targeted sync.
+     * This is the normal steady-state response: cheaper than a full snapshot
+     * and sufficient for the vast majority of drift scenarios.
+     */
+    TARGETED_SYNC,
+
+    /**
+     * Large total-count drift detected — replace Recon's entire SCM DB with a
+     * fresh checkpoint from SCM. Reserved for cases where targeted sync would
+     * be unreliable (e.g., Recon was down for hours and hundreds of containers
+     * changed state).
+     */
+    FULL_SNAPSHOT
+  }
 
   ReconStorageContainerSyncHelper(StorageContainerServiceProvider 
scmServiceProvider,
                                   OzoneConfiguration ozoneConfiguration,
                                   ReconContainerManager containerManager) {
+    this(scmServiceProvider, ozoneConfiguration, containerManager, null);
+  }
+
+  ReconStorageContainerSyncHelper(StorageContainerServiceProvider 
scmServiceProvider,
+                                  OzoneConfiguration ozoneConfiguration,
+                                  ReconContainerManager containerManager,
+                                  ReconScmContainerSyncMetrics metrics) {
     this.scmServiceProvider = scmServiceProvider;
     this.ozoneConfiguration = ozoneConfiguration;
     this.containerManager = containerManager;
+    this.metrics = metrics;
   }
 
+  /**
+   * Decides what sync action the periodic scheduler should take based on the
+   * observed drift between SCM and Recon.
+   *
+   * <p>Decision logic:
+   * <ol>
+   *   <li>If {@code |(SCM_total - SCM_open) - (Recon_total - Recon_open)| >
+   *       ozone.recon.scm.container.threshold} (default 1,000,000): return
+   *       {@link SyncAction#FULL_SNAPSHOT}. Large drift in non-OPEN containers
+   *       means Recon is badly behind on stable SCM state and a full 
checkpoint
+   *       replacement is cheaper and more reliable at that scale.</li>
+   *   <li>If total drift is positive but the non-OPEN drift is at or below the
+   *       threshold: return {@link SyncAction#TARGETED_SYNC}. This keeps large
+   *       OPEN-only gaps on the incremental path because missing OPEN
+   *       containers can be repaired cheaply without replacing the full SCM 
DB.</li>
+   *   <li>If total drift is zero, check per-state drift for each active
+   *       (non-terminal) lifecycle state against
+   *       {@code ozone.recon.scm.per.state.drift.threshold} (default 5):
+   *       <ul>
+   *         <li><b>OPEN</b>: detects containers stuck OPEN in Recon after SCM
+   *             has advanced them to QUASI_CLOSED or CLOSED.</li>
+   *         <li><b>QUASI_CLOSED</b>: detects containers stuck QUASI_CLOSED in
+   *             Recon after SCM has advanced them to CLOSED. This case 
produces
+   *             zero OPEN drift and is invisible to an OPEN-only check.</li>
+   *       </ul>
+   *       If drift in <em>any</em> checked state exceeds the threshold:
+   *       return {@link SyncAction#TARGETED_SYNC}.</li>
+   *   <li>Otherwise: return {@link SyncAction#NO_ACTION}.</li>
+   * </ol>
+   *
+   * <p>Per-state drift deliberately routes to targeted sync, not a full
+   * snapshot — the targeted sync's per-state passes correct each condition
+   * efficiently without replacing the entire database.
+   *
+   * @return the recommended {@link SyncAction}
+   * @throws IOException if SCM RPC calls to retrieve counts fail
+   */
+  public SyncAction decideSyncAction() throws IOException {
+    int largeThreshold = ozoneConfiguration.getInt(
+        OZONE_RECON_SCM_CONTAINER_THRESHOLD,
+        OZONE_RECON_SCM_CONTAINER_THRESHOLD_DEFAULT);
+    int perStateDriftThreshold = ozoneConfiguration.getInt(
+        OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD,
+        OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD_DEFAULT);
+    List<ContainerInfo> reconContainers = containerManager.getContainers();
+    long reconTotal = reconContainers.size();
+    long reconOpen = reconContainers.stream()
+        .filter(c -> c.getState() == HddsProtos.LifeCycleState.OPEN)
+        .count();
+
+    // --- Check 1: large non-OPEN drift escalates to full snapshot ---
+    long scmTotal = scmServiceProvider.getContainerCount();
+    long scmOpen = 
scmServiceProvider.getContainerCount(HddsProtos.LifeCycleState.OPEN);
+    long totalDrift = Math.abs(scmTotal - reconTotal);
+    long scmNonOpen = Math.max(0, scmTotal - scmOpen);
+    long reconNonOpen = Math.max(0, reconTotal - reconOpen);
+    long nonOpenDrift = Math.abs(scmNonOpen - reconNonOpen);
+
+    if (nonOpenDrift > largeThreshold) {
+      LOG.warn("Non-OPEN container drift {} exceeds threshold {} "

Review Comment:
   Agreed. During system testing, will evaluate it. Adding this to the system 
test scenario for evaluation. No dev code changes needed here. Closing the 
comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to