sumitagrawl commented on code in PR #10074: URL: https://github.com/apache/ozone/pull/10074#discussion_r3225550173
########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconScmContainerSyncMetrics.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.metrics; + +import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.util.Time; + +/** + * Metrics for Recon SCM container sync decisions and targeted sync execution. + */ [email protected] +@Metrics(about = "Recon SCM Container Sync Metrics", context = OzoneConsts.OZONE) +public final class ReconScmContainerSyncMetrics { + + private static final String SOURCE_NAME = + ReconScmContainerSyncMetrics.class.getSimpleName(); + + /** + * No targeted sync has run yet, or the latest scheduler cycle did not run one. + */ + public static final int TARGETED_SYNC_STATUS_IDLE = 0; + /** + * Targeted sync is currently running. + */ + public static final int TARGETED_SYNC_STATUS_IN_PROGRESS = 1; + /** + * The last targeted sync completed successfully. + */ + public static final int TARGETED_SYNC_STATUS_SUCCESS = 2; + /** + * The last targeted sync completed with one or more failed passes. + */ + public static final int TARGETED_SYNC_STATUS_FAILURE = 3; + + @Metric(about = "Count of events where non-OPEN container drift exceeded " + + "the full SCM DB snapshot threshold") + private MutableCounterLong fullScmDbSnapshotThresholdExceededCount; + + @Metric(about = "Last non-OPEN container drift observed when the full SCM " + + "DB snapshot threshold was exceeded") + private MutableGaugeLong lastFullScmDbSnapshotThresholdExceededNonOpenDrift; + + @Metric(about = "Time between the last two full SCM DB snapshot threshold " + + "exceeded events in milliseconds") + private MutableGaugeLong intervalSinceLastFullScmDbSnapshotThresholdExceededMs; + + @Metric(about = "Last OPEN container drift that triggered targeted sync") + private MutableGaugeLong lastOpenContainerDrift; + + @Metric(about = "Last QUASI_CLOSED container drift that triggered targeted sync") + private MutableGaugeLong lastQuasiClosedContainerDrift; + + @Metric(about = "Last CLOSED container drift that triggered targeted sync") + private MutableGaugeLong lastClosedContainerDrift; + + @Metric(about = "Targeted sync status: 0=idle, 1=in progress, " + + "2=success, 3=failure") + private MutableGaugeInt targetedSyncStatus; + + @Metric(about = "Time taken by the last targeted sync in milliseconds") + private MutableGaugeLong lastTargetedSyncDurationMs; + + private long lastFullSnapshotThresholdExceededTimestampMs; + + private ReconScmContainerSyncMetrics() { + } + + public static ReconScmContainerSyncMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(SOURCE_NAME, + "Recon SCM Container Sync Metrics", + new ReconScmContainerSyncMetrics()); + } + + public void unRegister() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(SOURCE_NAME); + } + + public synchronized void recordFullSnapshotThresholdExceededEvent( + long nonOpenDrift) { + fullScmDbSnapshotThresholdExceededCount.incr(); + lastFullScmDbSnapshotThresholdExceededNonOpenDrift.set(nonOpenDrift); + long now = Time.monotonicNow(); + if (lastFullSnapshotThresholdExceededTimestampMs > 0) { + intervalSinceLastFullScmDbSnapshotThresholdExceededMs.set( + now - lastFullSnapshotThresholdExceededTimestampMs); + } + lastFullSnapshotThresholdExceededTimestampMs = now; + } + + public void recordOpenContainerDrift(long drift) { + lastOpenContainerDrift.set(drift); + } + + public void recordQuasiClosedContainerDrift(long drift) { + lastQuasiClosedContainerDrift.set(drift); + } + + public void recordClosedContainerDrift(long drift) { + lastClosedContainerDrift.set(drift); + } + + public void setTargetedSyncStatus(int status) { + targetedSyncStatus.set(status); + } + + public void setLastTargetedSyncDurationMs(long durationMs) { + lastTargetedSyncDurationMs.set(durationMs); + } + + public long getFullScmDbSnapshotThresholdExceededCount() { Review Comment: can check metrics for full, not required now ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java: ########## @@ -241,6 +241,12 @@ private void initialize() throws IOException { Objects.requireNonNull(container, "container == null"); containers.addContainer(container); if (container.getState() == LifeCycleState.OPEN) { + if (container.getPipelineID() == null) { Review Comment: we can add comment for case for Recon, if container added but pipeline info not present in Recon, required for other places in this class ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java: ########## @@ -179,33 +185,157 @@ public void checkAndAddNewContainerBatch( } /** - * Check if container state is not open. In SCM, container state - * changes to CLOSING first, and then the close command is pushed down - * to Datanodes. Recon 'learns' this from DN, and hence replica state - * will move container state to 'CLOSING'. + * Transitions a container from OPEN to CLOSING, keeping the per-pipeline + * open-container count in {@link #pipelineToOpenContainer} accurate. * - * @param containerID containerID to check - * @param state state to be compared + * <p>Must be called whenever an OPEN container is moved to CLOSING so that + * the pipeline's open-container count stays consistent. Both the DN-report + * driven path ({@link #checkContainerStateAndUpdate}) and the periodic sync + * passes ({@code processSyncedClosedContainer}, {@code syncQuasiClosedContainers}) + * use this method to avoid divergence in the count exposed to the Recon Node API. + * + * <p>If the container was recorded without a pipeline (null pipeline at + * {@code addNewContainer} time) the count decrement is safely skipped. + * + * @param containerID container to advance from OPEN to CLOSING + * @param containerInfo already-fetched {@code ContainerInfo} for the container + * (avoids a redundant lookup inside this method) + * @throws IOException if the state update fails + * @throws InvalidStateTransitionException if the container is not in OPEN state */ - - private void checkContainerStateAndUpdate(ContainerID containerID, - ContainerReplicaProto.State state) - throws IOException, InvalidStateTransitionException { - ContainerInfo containerInfo = getContainer(containerID); - if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN) - && !state.equals(ContainerReplicaProto.State.OPEN) - && isHealthy(state)) { - LOG.info("Container {} has state OPEN, but given state is {}.", - containerID, state); - final PipelineID pipelineID = containerInfo.getPipelineID(); - // subtract open container count from the map + void transitionOpenToClosing(ContainerID containerID, ContainerInfo containerInfo) + throws IOException, InvalidStateTransitionException { + PipelineID pipelineID = containerInfo.getPipelineID(); + if (pipelineID != null) { int curCnt = pipelineToOpenContainer.getOrDefault(pipelineID, 0); if (curCnt == 1) { pipelineToOpenContainer.remove(pipelineID); } else if (curCnt > 0) { pipelineToOpenContainer.put(pipelineID, curCnt - 1); } - updateContainerState(containerID, FINALIZE); + } + updateContainerState(containerID, FINALIZE); // OPEN → CLOSING Review Comment: check if can reuse scm code, instead of writting again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
