devmadhuu commented on code in PR #10074:
URL: https://github.com/apache/ozone/pull/10074#discussion_r3260353419
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java:
##########
@@ -19,87 +19,881 @@
import static
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
import static
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLEANUP;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLOSE;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.DELETE;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.FORCE_CLOSE;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.QUASI_CLOSE;
import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE;
import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_THRESHOLD_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD_DEFAULT;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.recon.metrics.ReconScmContainerSyncMetrics;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Helper class that performs targeted incremental sync between SCM and Recon
+ * container metadata. Each sync cycle scans the SCM states Recon can safely
+ * reconcile (OPEN, QUASI_CLOSED, CLOSED and DELETED), all completing in a
+ * single cycle with local pagination. SCM CLOSING and DELETING are skipped
+ * deliberately because they are intermediate states.
+ *
+ * <ol>
+ * <li><b>OPEN:</b> scans only newly created OPEN containers starting from
the
+ * last-seen ID ({@code pass2OpenStartContainerId}). Existing containers
in
+ * later Recon states are not moved backwards to OPEN.</li>
+ * <li><b>QUASI_CLOSED and CLOSED:</b> paginate SCM state lists; add absent
+ * containers and advance existing Recon containers through valid local
+ * state-machine transitions. If Recon has DELETED but SCM reports one of
+ * these states, Recon rebuilds the container record from SCM
metadata.</li>
+ * <li><b>DELETED:</b> paginates SCM's DELETED list using
+ * {@code getListOfContainerInfos}. For each container SCM reports as
+ * DELETED, Recon drives the container to DELETED in a single call. The
+ * DELETING list is intentionally skipped to avoid leaving Recon in an
+ * intermediate DELETING state across cycles.</li>
+ * </ol>
+ *
+ * <h3>Scalability at 100M containers</h3>
+ * <ul>
+ * <li>{@link #decideSyncAction()} sums per-state counts (O(number of
+ * states), all O(1) lookups) — never calls {@code getContainers()}
+ * which would allocate a large
+ * {@code List<ContainerInfo>} on every decision tick.</li>
+ * <li>Live-state sync issues <b>one</b>
+ * {@code getExistContainerWithPipelinesInBatch} RPC per sub-batch of
+ * absent containers — not one per absent container.
+ * Sub-batch size is bounded by {@link
#safeContainerWithPipelineBatchSize}
+ * to keep the CWP response within the 128 MB IPC limit.</li>
+ * <li>DELETED sync uses {@code getListOfContainerInfos} against SCM's
+ * DELETED list only, bounded by {@link
#safeContainerInfoBatchSize(int)}.
+ * </li>
+ * </ul>
+ */
class ReconStorageContainerSyncHelper {
- // Serialized size of one ContainerID proto on the wire (varint tag + 8-byte
long = ~12 bytes).
- // Used to derive the maximum batch size that fits within
ipc.maximum.data.length.
+ /**
+ * Wire size of one {@code ContainerID} proto (varint tag + 8-byte long ≈ 12
bytes).
+ * Used to compute the maximum number of IDs that fit in one
+ * {@code getListOfContainerIDs} RPC call, where both the request (IDs sent
+ * to SCM) and the response (IDs returned by SCM) carry only ContainerID
entries.
+ * Applies to live-state pagination and DELETED ID lists
+ * (DELETED ID list).
+ */
private static final long CONTAINER_ID_PROTO_SIZE_BYTES = 12;
+ /**
+ * Conservative wire-size upper bound for one {@code ContainerInfo} proto
+ * response entry (used by {@code getListOfContainerInfos}).
+ *
+ * <p>ContainerInfo carries only container metadata — no pipeline, no
+ * DatanodeDetails. Measured estimate: containerID(8) + state(2) +
+ * usedBytes(8) + numberOfKeys(8) + owner(~20) + replicationType(2) +
+ * replicationFactor(2) + stateEnterTime(8) + sequenceId(8) +
+ * pipelineID(~20) ≈ 86 bytes. This constant uses <b>128 bytes</b>
+ * (~1.5× measured) as a safety margin.
+ *
+ * <p>Safe max batch for {@code getListOfContainerInfos} at 128 MB IPC limit:
+ * <pre>
+ * 128 MB / 128 bytes = 1,048,576 containers per call
+ * (actual bytes: 1,048,576 × 86 ≈ 86 MB — well within limit)
+ * </pre>
+ *
+ * <p>This is ~8× larger than {@link
#CONTAINER_WITH_PIPELINE_PROTO_SIZE_BYTES}
+ * because ContainerInfo does not include the Pipeline and DatanodeDetails
+ * entries that dominate the ContainerWithPipeline size.
+ */
+ private static final long CONTAINER_INFO_PROTO_SIZE_BYTES = 128;
+
+ /**
+ * Conservative wire-size upper bound for one {@code ContainerWithPipeline}
+ * proto response entry.
+ *
+ * <p>Measured estimate: ContainerInfoProto ~120 bytes + PipelineProto with 3
+ * DatanodeDetailsProto entries ~370 bytes ≈ 490 bytes. This constant uses
+ * <b>1024 bytes</b> — approximately 2× the measured value — to provide a
+ * comfortable safety margin against larger deployments where hostnames,
+ * certificates, or additional port entries grow the proto beyond the
estimate.
+ *
+ * <p>This constant is used exclusively to bound the <em>response</em> of
+ * {@code getExistContainerWithPipelinesInBatch}. The <em>request</em>
carries
+ * only container IDs and is bounded by {@link
#CONTAINER_ID_PROTO_SIZE_BYTES}.
+ * The two constants are different because the request and response payloads
+ * have vastly different sizes (12 bytes vs ~490 bytes per entry).
+ *
+ * <h3>Safe batch limits at the 128 MB default IPC ceiling</h3>
+ * <p>{@code IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 134,217,728 bytes = 128 MB}
+ * (verified from Hadoop 3.x {@code CommonConfigurationKeys}).
+ * <pre>
+ * Single-state CWP call (absent-container adds):
+ * 128 MB / 1024 bytes = 131,072 containers per call
+ * (actual bytes: 131,072 × 490 ≈ 61 MB — well within limit)
+ * </pre>
+ *
+ * @see #safeContainerWithPipelineBatchSize(int)
+ */
+ private static final long CONTAINER_WITH_PIPELINE_PROTO_SIZE_BYTES = 1024;
+
+ /**
+ * Monotonic cursor for OPEN add-only sync. OPEN containers are
+ * created with increasing container IDs, so each cycle only needs to scan
+ * from the last-seen ID onward rather than rescanning the full OPEN set.
+ *
+ * <p>{@link AtomicLong} rather than {@code volatile long}: provides the same
+ * visibility guarantee but expresses concurrent intent explicitly through
the
+ * type, following standard Java concurrency conventions. The CAS mutex in
+ * {@link ReconStorageContainerManagerFacade} ensures a single writer, so
+ * compound-atomic operations ({@code compareAndSet}, {@code getAndAdd}) are
+ * not needed — only {@code get()} and {@code set()} are used.
+ */
+ private final AtomicLong pass2OpenStartContainerId = new AtomicLong(1L);
+
private static final Logger LOG = LoggerFactory
.getLogger(ReconStorageContainerSyncHelper.class);
private final StorageContainerServiceProvider scmServiceProvider;
private final OzoneConfiguration ozoneConfiguration;
private final ReconContainerManager containerManager;
+ private final ReconScmContainerSyncMetrics metrics;
+
+ /**
+ * Describes the action that the periodic scheduler should take based on the
+ * observed drift between SCM and Recon container metadata.
+ */
+ public enum SyncAction {
+ /**
+ * No drift detected — no sync work needed this cycle.
+ */
+ NO_ACTION,
+
+ /**
+ * Drift detected — run the targeted sync.
+ * This is the normal steady-state response: cheaper than a full snapshot
+ * and sufficient for the vast majority of drift scenarios.
+ */
+ TARGETED_SYNC
+ }
ReconStorageContainerSyncHelper(StorageContainerServiceProvider
scmServiceProvider,
OzoneConfiguration ozoneConfiguration,
ReconContainerManager containerManager) {
+ this(scmServiceProvider, ozoneConfiguration, containerManager, null);
+ }
+
+ ReconStorageContainerSyncHelper(StorageContainerServiceProvider
scmServiceProvider,
+ OzoneConfiguration ozoneConfiguration,
+ ReconContainerManager containerManager,
+ ReconScmContainerSyncMetrics metrics) {
this.scmServiceProvider = scmServiceProvider;
this.ozoneConfiguration = ozoneConfiguration;
this.containerManager = containerManager;
+ this.metrics = metrics;
}
+ /**
+ * Decides what sync action the periodic scheduler should take based on the
+ * observed drift between SCM and Recon.
+ *
+ * <p>Decision logic:
+ * <ol>
+ * <li>If {@code |(SCM_total - SCM_open) - (Recon_total - Recon_open)| >
+ * ozone.recon.scm.container.threshold} (default 1,000,000): record the
+ * large-drift event through logs and metrics, then return
+ * {@link SyncAction#TARGETED_SYNC}.</li>
+ * <li>If OPEN drift is positive: return
+ * {@link SyncAction#TARGETED_SYNC}. This keeps OPEN-only gaps on the
+ * incremental path because missing OPEN containers can be repaired
+ * cheaply without replacing the full SCM DB.</li>
+ * <li>Check per-state drift for QUASI_CLOSED and CLOSED against
+ * {@code ozone.recon.scm.per.state.drift.threshold} (default 1):
+ * <ul>
+ * <li><b>QUASI_CLOSED</b>: detects containers stuck QUASI_CLOSED in
+ * Recon after SCM has advanced them to CLOSED.</li>
+ * <li><b>CLOSED</b>: detects CLOSED count mismatch when total counts
+ * are equal (e.g., OPEN and QUASI_CLOSED counts are also equal
but
+ * some containers are in the wrong bucket).</li>
+ * </ul>
+ * If drift in either stable state reaches the threshold:
+ * return {@link SyncAction#TARGETED_SYNC}.</li>
+ * <li>DELETED-only total-count drift does not trigger targeted sync.
+ * DELETED sync can converge SCM-reported DELETED containers into
Recon,
+ * but it cannot remove extra DELETED containers that SCM no longer
+ * lists.</li>
+ * <li>Otherwise: return {@link SyncAction#NO_ACTION}.</li>
+ * </ol>
+ *
+ * <p>Repairable per-state drift deliberately routes to targeted sync, not a
+ * full snapshot — targeted sync corrects these conditions efficiently
without
+ * replacing the entire database.
+ *
+ * @return the recommended {@link SyncAction}
+ * @throws IOException if SCM RPC calls to retrieve counts fail
+ */
+ public SyncAction decideSyncAction() throws IOException {
+ int largeThreshold = ozoneConfiguration.getInt(
+ OZONE_RECON_SCM_CONTAINER_THRESHOLD,
+ OZONE_RECON_SCM_CONTAINER_THRESHOLD_DEFAULT);
+ int perStateDriftThreshold = Math.max(1, ozoneConfiguration.getInt(
+ OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD,
+ OZONE_RECON_SCM_PER_STATE_DRIFT_THRESHOLD_DEFAULT));
+
+ // All per-state counts use O(1) getContainerStateCount() — no heap
allocation,
+ // no container list loading. OPEN is read here because it is needed for
both
+ // the Check 1 non-OPEN drift and Check 2 repairable per-state drift
checks.
+ // QUASI_CLOSED and CLOSED are read later in Check 3 where they are first
+ // needed.
+ long reconOpen =
+
containerManager.getContainerStateCount(HddsProtos.LifeCycleState.OPEN);
+ long reconTotal = containerManager.getTotalContainerCount();
+
+ // --- Check 1: large non-OPEN drift exceeds threshold ---
+ long scmTotal = scmServiceProvider.getContainerCount();
+ long scmOpen =
scmServiceProvider.getContainerCount(HddsProtos.LifeCycleState.OPEN);
+ long totalDrift = Math.abs(scmTotal - reconTotal);
+ long scmNonOpen = Math.max(0, scmTotal - scmOpen);
+ long reconNonOpen = Math.max(0, reconTotal - reconOpen);
+ long nonOpenDrift = Math.abs(scmNonOpen - reconNonOpen);
+
+ if (nonOpenDrift > largeThreshold) {
+ LOG.warn("Tiered sync decision: TARGETED_SYNC. "
+ + "Non-OPEN container drift {} exceeds threshold {} "
+ + "(SCM_non_OPEN={}, Recon_non_OPEN={}, SCM_total={},
Recon_total={}). "
+ + "Recording large-drift threshold-exceeded event and running "
+ + "targeted sync. Check Recon metrics "
+ + "fullScmDbSnapshotThresholdExceededCount, "
+ + "lastFullScmDbSnapshotThresholdExceededNonOpenDrift, and "
+ + "intervalSinceLastFullScmDbSnapshotThresholdExceededMs.",
+ nonOpenDrift, largeThreshold, scmNonOpen, reconNonOpen, scmTotal,
reconTotal);
+ if (metrics != null) {
+ metrics.recordFullSnapshotThresholdExceededEvent(nonOpenDrift);
+ }
+ return SyncAction.TARGETED_SYNC;
+ }
+ // --- Check 2: OPEN drift is always repairable through targeted sync. ---
+ long openDrift = Math.abs(scmOpen - reconOpen);
+ if (openDrift > 0) {
+ LOG.info("OPEN container drift {} detected (SCM_OPEN={}, Recon_OPEN={}).
"
+ + "Using targeted sync.",
+ openDrift, scmOpen, reconOpen);
+ if (metrics != null) {
+ recordPerStateDriftMetric(HddsProtos.LifeCycleState.OPEN, openDrift);
+ }
+ return SyncAction.TARGETED_SYNC;
+ }
+
+ // --- Check 3: stable-state drift that targeted sync can repair. ---
+ //
+ // Counts are read directly via O(1) per-state lookups — no
+ // derivation or subtraction. Each is an exact count from the per-state
+ // NavigableMap on Recon and from SCM's getContainerStateCount on SCM.
+ long scmQuasiClosed =
+
scmServiceProvider.getContainerCount(HddsProtos.LifeCycleState.QUASI_CLOSED);
+ long scmClosed =
+ scmServiceProvider.getContainerCount(HddsProtos.LifeCycleState.CLOSED);
+ long reconQuasiClosed =
+
containerManager.getContainerStateCount(HddsProtos.LifeCycleState.QUASI_CLOSED);
+ long reconClosed =
+
containerManager.getContainerStateCount(HddsProtos.LifeCycleState.CLOSED);
+
+ for (Object[] entry : new Object[][]{
+ {HddsProtos.LifeCycleState.QUASI_CLOSED, scmQuasiClosed,
reconQuasiClosed},
+ {HddsProtos.LifeCycleState.CLOSED, scmClosed, reconClosed}}) {
+ HddsProtos.LifeCycleState state = (HddsProtos.LifeCycleState) entry[0];
+ long scmCount = (long) entry[1];
+ long reconCount = (long) entry[2];
+ long drift = Math.abs(scmCount - reconCount);
+ if (drift >= perStateDriftThreshold) {
+ LOG.info("Per-state {} drift {} detected (SCM_{}={}, Recon_{}={},
threshold={}). "
+ + "Targeted sync will correct repairable state drift.",
+ state, drift, state, scmCount, state, reconCount,
perStateDriftThreshold);
+ if (metrics != null) {
+ recordPerStateDriftMetric(state, drift);
+ }
+ return SyncAction.TARGETED_SYNC;
+ }
+ }
+
+ LOG.info("No repairable drift detected (total drift={}, non-OPEN
drift={}). "
+ + "No sync needed.",
+ totalDrift, nonOpenDrift);
+ return SyncAction.NO_ACTION;
+ }
+
+ private void recordPerStateDriftMetric(HddsProtos.LifeCycleState state,
+ long drift) {
+ switch (state) {
+ case OPEN:
+ metrics.recordOpenContainerDrift(drift);
+ break;
+ case QUASI_CLOSED:
+ metrics.recordQuasiClosedContainerDrift(drift);
+ break;
+ case CLOSED:
+ metrics.recordClosedContainerDrift(drift);
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Runs targeted sync for SCM states Recon can safely reconcile.
+ */
public boolean syncWithSCMContainerInfo() {
+ boolean open = syncContainersForState(HddsProtos.LifeCycleState.OPEN,
true);
+ boolean quasiClosed =
+ syncContainersForState(HddsProtos.LifeCycleState.QUASI_CLOSED, false);
+ boolean closed =
+ syncContainersForState(HddsProtos.LifeCycleState.CLOSED, false);
+ boolean deleted = syncDeletedContainers();
+ return open && quasiClosed && closed && deleted;
+ }
+
+ /**
+ * Paginates one SCM lifecycle state and reconciles each returned container
ID.
+ */
+ private boolean syncContainersForState(HddsProtos.LifeCycleState scmState,
+ boolean incrementalOpen) {
try {
- long totalContainerCount = scmServiceProvider.getContainerCount(
- HddsProtos.LifeCycleState.CLOSED);
- long containerCountPerCall =
- getContainerCountPerCall(totalContainerCount);
- ContainerID startContainerId = ContainerID.valueOf(1);
- long retrievedContainerCount = 0;
- if (totalContainerCount > 0) {
- while (retrievedContainerCount < totalContainerCount) {
- List<ContainerID> listOfContainers = scmServiceProvider.
- getListOfContainerIDs(startContainerId,
- Long.valueOf(containerCountPerCall).intValue(),
- HddsProtos.LifeCycleState.CLOSED);
- if (null != listOfContainers && !listOfContainers.isEmpty()) {
- LOG.info("Got list of containers from SCM : {}",
listOfContainers.size());
- listOfContainers.forEach(containerID -> {
- boolean isContainerPresentAtRecon =
containerManager.containerExist(containerID);
- if (!isContainerPresentAtRecon) {
- try {
- ContainerWithPipeline containerWithPipeline =
- scmServiceProvider.getContainerWithPipeline(
- containerID.getId());
- containerManager.addNewContainer(containerWithPipeline);
- } catch (IOException e) {
- LOG.error("Could not get container with pipeline " +
- "for container : {}", containerID);
- }
- }
- });
- long lastID = listOfContainers.get(listOfContainers.size() -
1).getId();
- startContainerId = ContainerID.valueOf(lastID + 1);
+ long total = scmServiceProvider.getContainerCount(scmState);
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]