devmadhuu commented on code in PR #10074:
URL: https://github.com/apache/ozone/pull/10074#discussion_r3283024589
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java:
##########
@@ -53,53 +173,545 @@ class ReconStorageContainerSyncHelper {
this.containerManager = containerManager;
}
+ /**
+ * Runs targeted sync for SCM states Recon can safely reconcile.
+ */
public boolean syncWithSCMContainerInfo() {
+ boolean open = syncContainersForState(HddsProtos.LifeCycleState.OPEN,
true);
+ boolean quasiClosed =
+ syncContainersForState(HddsProtos.LifeCycleState.QUASI_CLOSED, false);
+ boolean closed =
+ syncContainersForState(HddsProtos.LifeCycleState.CLOSED, false);
+ boolean deleted = syncDeletedContainers();
+ return open && quasiClosed && closed && deleted;
+ }
+
+ /**
+ * Paginates one SCM lifecycle state and reconciles each returned container
ID.
+ */
+ private boolean syncContainersForState(HddsProtos.LifeCycleState scmState,
+ boolean incrementalOpen) {
try {
- long totalContainerCount = scmServiceProvider.getContainerCount(
- HddsProtos.LifeCycleState.CLOSED);
- long containerCountPerCall =
- getContainerCountPerCall(totalContainerCount);
- ContainerID startContainerId = ContainerID.valueOf(1);
- long retrievedContainerCount = 0;
- if (totalContainerCount > 0) {
- while (retrievedContainerCount < totalContainerCount) {
- List<ContainerID> listOfContainers = scmServiceProvider.
- getListOfContainerIDs(startContainerId,
- Long.valueOf(containerCountPerCall).intValue(),
- HddsProtos.LifeCycleState.CLOSED);
- if (null != listOfContainers && !listOfContainers.isEmpty()) {
- LOG.info("Got list of containers from SCM : {}",
listOfContainers.size());
- listOfContainers.forEach(containerID -> {
- boolean isContainerPresentAtRecon =
containerManager.containerExist(containerID);
- if (!isContainerPresentAtRecon) {
- try {
- ContainerWithPipeline containerWithPipeline =
- scmServiceProvider.getContainerWithPipeline(
- containerID.getId());
- containerManager.addNewContainer(containerWithPipeline);
- } catch (IOException e) {
- LOG.error("Could not get container with pipeline " +
- "for container : {}", containerID);
- }
- }
- });
- long lastID = listOfContainers.get(listOfContainers.size() -
1).getId();
- startContainerId = ContainerID.valueOf(lastID + 1);
+ long total = scmServiceProvider.getContainerCount(scmState);
+ if (total == 0) {
+ LOG.debug("{} sync: no containers found in SCM.", scmState);
+ return true;
+ }
+
+ int batchSize = (int) getContainerCountPerCall(total);
+ long initialStart = incrementalOpen ? pass2OpenStartContainerId.get() :
1L;
+ ContainerID startContainerId = ContainerID.valueOf(initialStart);
+ long retrieved = 0;
+ int addedCount = 0;
+ int reconciledCount = 0;
+
+ while (true) {
+ List<ContainerID> batch = scmServiceProvider.getListOfContainerIDs(
+ startContainerId, batchSize, scmState);
+ if (batch == null || batch.isEmpty()) {
+ break;
+ }
+
+ List<Long> absentIds = new ArrayList<>();
+ List<ContainerID> presentIds = new ArrayList<>();
+ for (ContainerID containerID : batch) {
+ if (!containerManager.containerExist(containerID)) {
+ absentIds.add(containerID.getId());
} else {
- LOG.info("No containers found at SCM in CLOSED state");
- return false;
+ presentIds.add(containerID);
}
- retrievedContainerCount += containerCountPerCall;
}
+
+ if (!absentIds.isEmpty()) {
+ addedCount += batchedAddMissingContainers(
+ absentIds, scmState, scmState + " sync");
+ }
+
+ for (ContainerID containerID : presentIds) {
+ reconciledCount += reconcileExistingContainer(containerID, scmState);
+ }
+
+ long lastID = batch.get(batch.size() - 1).getId();
+ long nextID = lastID + 1;
+ if (incrementalOpen) {
+ pass2OpenStartContainerId.set(nextID);
+ }
+ startContainerId = ContainerID.valueOf(nextID);
+ retrieved += batch.size();
+ }
+
+ LOG.info("{} sync complete from start {}, checked {}, added {},
reconciled {}.",
+ scmState, initialStart, retrieved, addedCount, reconciledCount);
+ return true;
+ } catch (Exception e) {
+ LOG.error("{} sync: unexpected error.", scmState, e);
+ return false;
+ }
+ }
+
+ private int reconcileExistingContainer(ContainerID containerID,
+ HddsProtos.LifeCycleState scmState) {
+ try {
+ ContainerInfo reconContainer =
containerManager.getContainer(containerID);
+ HddsProtos.LifeCycleState reconState = reconContainer.getState();
+ if (reconState == scmState) {
+ return 0;
+ }
+
+ switch (scmState) {
+ case OPEN:
+ LOG.debug("Skipping container {} because SCM reports OPEN while Recon "
+ + "already has state {}.", containerID, reconState);
+ return 0;
+ case QUASI_CLOSED:
+ return reconcileToQuasiClosed(containerID, reconContainer, reconState);
+ case CLOSED:
+ return reconcileToClosed(containerID, reconContainer, reconState);
+ default:
+ LOG.debug("Skipping container {} for unsupported SCM sync state {}.",
+ containerID, scmState);
+ return 0;
+ }
+ } catch (ContainerNotFoundException e) {
+ LOG.debug("Container {} vanished from Recon during {} sync.",
+ containerID, scmState);
+ }
+ return 0;
+ }
+
+ private int reconcileToQuasiClosed(ContainerID containerID,
+ ContainerInfo reconContainer,
+ HddsProtos.LifeCycleState reconState) {
+ try {
+ if (reconState == HddsProtos.LifeCycleState.DELETED) {
+ return rebuildContainerFromScm(containerID,
+ HddsProtos.LifeCycleState.QUASI_CLOSED);
+ }
+ if (reconState == HddsProtos.LifeCycleState.OPEN) {
+ containerManager.transitionOpenToClosing(containerID, reconContainer);
+ reconState = HddsProtos.LifeCycleState.CLOSING;
+ }
+ if (reconState == HddsProtos.LifeCycleState.CLOSING) {
+ containerManager.updateContainerState(containerID, QUASI_CLOSE);
+ LOG.info("Container {} corrected to QUASI_CLOSED based on SCM state.",
+ containerID);
+ return 1;
+ }
+ LOG.debug("Skipping container {} because SCM reports QUASI_CLOSED while "
+ + "Recon has state {}.", containerID, reconState);
+ } catch (InvalidStateTransitionException | IOException e) {
+ LOG.warn("Failed to reconcile container {} to QUASI_CLOSED.",
+ containerID, e);
+ }
+ return 0;
+ }
+
+ private int reconcileToClosed(ContainerID containerID,
+ ContainerInfo reconContainer,
+ HddsProtos.LifeCycleState reconState) {
+ try {
+ if (reconState == HddsProtos.LifeCycleState.DELETED) {
+ return rebuildContainerFromScm(containerID,
HddsProtos.LifeCycleState.CLOSED);
+ }
+ if (reconState == HddsProtos.LifeCycleState.OPEN) {
+ containerManager.transitionOpenToClosing(containerID, reconContainer);
+ reconState = HddsProtos.LifeCycleState.CLOSING;
+ }
+ if (reconState == HddsProtos.LifeCycleState.CLOSING) {
+ containerManager.updateContainerState(containerID, CLOSE);
+ LOG.info("Container {} corrected to CLOSED based on SCM state.",
+ containerID);
+ return 1;
+ }
+ if (reconState == HddsProtos.LifeCycleState.QUASI_CLOSED) {
+ containerManager.updateContainerState(containerID, FORCE_CLOSE);
+ LOG.info("Container {} corrected from QUASI_CLOSED to CLOSED based "
+ + "on SCM state.", containerID);
+ return 1;
+ }
+ LOG.debug("Skipping container {} because SCM reports CLOSED while Recon "
+ + "has state {}.", containerID, reconState);
+ } catch (InvalidStateTransitionException | IOException e) {
+ LOG.warn("Failed to reconcile container {} to CLOSED.", containerID, e);
+ }
+ return 0;
+ }
+
+ private int rebuildContainerFromScm(ContainerID containerID,
+ HddsProtos.LifeCycleState scmState) {
+ try {
+ List<ContainerInfo> infos = scmServiceProvider.getListOfContainerInfos(
+ containerID, 1, scmState);
+ if (infos.isEmpty() || !infos.get(0).containerID().equals(containerID)) {
+ LOG.debug("Container {} no longer in SCM state {}; skipping rebuild.",
+ containerID, scmState);
+ return 0;
+ }
+ containerManager.deleteContainer(containerID);
+ containerManager.addNewContainer(new ContainerWithPipeline(infos.get(0),
null));
+ LOG.info("Rebuilt container {} in Recon from DELETED to SCM state {}.",
+ containerID, scmState);
+ return 1;
+ } catch (IOException e) {
+ LOG.warn("Failed to rebuild container {} from SCM state {}.",
+ containerID, scmState, e);
+ return 0;
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // DELETED sync — SCM-driven, transition only for existing containers.
+ //
---------------------------------------------------------------------------
+
+ /**
+ * Retires containers that SCM has fully deleted (state = DELETED) but Recon
+ * still holds as CLOSED or QUASI_CLOSED.
+ *
+ * <p>Only SCM's DELETED list is scanned — not DELETING. Reason: if we
+ * processed DELETING, we would drive Recon to the intermediate DELETING
state
+ * and leave it there until the next cycle. In the next cycle, Recon would be
+ * DELETING but the condition checks CLOSED || QUASI_CLOSED — causing the
+ * container to be stuck at DELETING forever. By waiting for SCM to confirm
+ * full deletion (DELETED), we transition Recon atomically from
+ * CLOSED/QUASI_CLOSED → DELETING → DELETED in a single call with no
+ * cross-cycle intermediate state.
+ *
+ * <p>Uses {@code getListOfContainerInfos} rather than
+ * {@code getExistContainerWithPipelinesInBatch} because DELETED sync needs
the
+ * DELETED container metadata but does not need pipeline resolution.
+ *
+ * @return {@code true} if all RPC calls completed without error
+ */
+ private boolean syncDeletedContainers() {
+ try {
+ // getListOfContainerInfos returns ContainerInfo objects (~86 bytes each
+ // on wire). Use safeContainerInfoBatchSize (not
safeContainerWithPipelineBatchSize)
+ // because ContainerInfo does NOT include Pipeline or DatanodeDetails.
+ // At batchSize=50K: 50K × 86 bytes ≈ 4 MB per page — well within 128 MB
IPC limit.
+ int configuredBatch = ozoneConfiguration.getInt(
+ OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE,
+ OZONE_RECON_SCM_DELETED_CONTAINER_CHECK_BATCH_SIZE_DEFAULT);
+ int batchSize = safeContainerInfoBatchSize(configuredBatch);
+ int retiredCount = 0;
+
+ // Paginate SCM's DELETED list using getListOfContainerInfos (not
IDs-only).
+ // We need full ContainerInfo to handle two cases correctly:
+ // 1. Containers absent from Recon: added via addNewContainer with
their
+ // actual replication config (RATIS or EC) — using IDs only would
+ // require a placeholder config that could be wrong for EC
containers.
+ // 2. Containers present in Recon: retired using
retireContainerToDeleted.
+ //
+ // Memory: one page (batchSize ContainerInfo objects) in memory at a
time.
+ // Each page is processed and GC'd before the next page is fetched —
never
+ // millions of objects simultaneously.
+ //
+ // We do NOT scan the DELETING list: processing DELETING would drive
Recon
+ // to an intermediate DELETING state across cycles (stuck). We wait for
SCM
+ // to confirm full deletion (DELETED) and then retire atomically.
+ ContainerID start = ContainerID.valueOf(1);
+ while (true) {
+ List<ContainerInfo> page = scmServiceProvider.getListOfContainerInfos(
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]