This is an automated email from the ASF dual-hosted git repository.
sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new dfe4c75df9d HDDS-13637. Add metrics in recon OM sync for staging and
queue based implementation. (#9103)
dfe4c75df9d is described below
commit dfe4c75df9dd28d9d4b206330a127f70230c6538
Author: Devesh Kumar Singh <[email protected]>
AuthorDate: Mon Oct 13 16:44:22 2025 +0530
HDDS-13637. Add metrics in recon OM sync for staging and queue based
implementation. (#9103)
---
.../recon/metrics/OzoneManagerSyncMetrics.java | 12 --
.../ozone/recon/metrics/ReconSyncMetrics.java | 182 +++++++++++++++++++++
.../recon/metrics/ReconTaskControllerMetrics.java | 161 ++++++++++++++++++
.../ozone/recon/metrics/ReconTaskMetrics.java | 182 +++++++++++++++++++++
.../spi/impl/OzoneManagerServiceProviderImpl.java | 80 +++++++--
.../ozone/recon/tasks/OMUpdateEventBuffer.java | 35 +++-
.../ozone/recon/tasks/ReconTaskControllerImpl.java | 123 +++++++++++---
.../ozone/recon/tasks/TestOMUpdateEventBuffer.java | 2 +-
8 files changed, 720 insertions(+), 57 deletions(-)
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/OzoneManagerSyncMetrics.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/OzoneManagerSyncMetrics.java
index 403a08b5b09..dced0d0e63d 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/OzoneManagerSyncMetrics.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/OzoneManagerSyncMetrics.java
@@ -25,7 +25,6 @@
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeFloat;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
-import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.ozone.OzoneConsts;
/**
@@ -44,9 +43,6 @@ public final class OzoneManagerSyncMetrics {
@Metric(about = "Number of OM snapshot requests that failed.")
private MutableCounterLong numSnapshotRequestsFailed;
- @Metric(about = "OM snapshot request latency")
- private MutableRate snapshotRequestLatency;
-
@Metric(about = "Number of OM delta requests made by Recon that had " +
"at least 1 update in the response.")
private MutableCounterLong numNonZeroDeltaRequests;
@@ -86,10 +82,6 @@ public void incrNumSnapshotRequestsFailed() {
this.numSnapshotRequestsFailed.incr();
}
- public void updateSnapshotRequestLatency(long time) {
- this.snapshotRequestLatency.add(time);
- }
-
public void incrNumDeltaRequestsFailed() {
this.numSnapshotRequestsFailed.incr();
}
@@ -114,10 +106,6 @@ public long getNumSnapshotRequestsFailed() {
return numSnapshotRequestsFailed.value();
}
- MutableRate getSnapshotRequestLatency() {
- return snapshotRequestLatency;
- }
-
public long getNumDeltaRequestsFailed() {
return numDeltaRequestsFailed.value();
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconSyncMetrics.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconSyncMetrics.java
new file mode 100644
index 00000000000..5eb35032ef4
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconSyncMetrics.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.metrics;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+/**
+ * Metrics for Recon OM synchronization operations.
+ * This class tracks delta and full snapshot sync operations between Recon and
OM.
+ */
[email protected]
+@Metrics(about = "Recon OM Sync Metrics", context = OzoneConsts.OZONE)
+public final class ReconSyncMetrics {
+
+ private static final String SOURCE_NAME =
+ ReconSyncMetrics.class.getSimpleName();
+
+ // Delta Fetch Operations
+ @Metric(about = "Time taken to fetch delta updates from OM")
+ private MutableRate deltaFetchDuration;
+
+ @Metric(about = "Count of successful delta fetch operations")
+ private MutableCounterLong deltaFetchSuccess;
+
+ @Metric(about = "Count of failed delta fetch operations")
+ private MutableCounterLong deltaFetchFailures;
+
+ @Metric(about = "Total size of delta data fetched in bytes")
+ private MutableCounterLong deltaDataFetchSize;
+
+ // Delta Apply Operations (Conversion + DB Apply Combined)
+ @Metric(about = "Time taken to apply delta updates to Recon OM DB")
+ private MutableRate deltaApplyDuration;
+
+ @Metric(about = "Count of failed delta apply operations")
+ private MutableCounterLong deltaApplyFailures;
+
+ // Full DB Snapshot Metrics
+ @Metric(about = "Time taken to fetch full DB snapshot")
+ private MutableRate fullDBRequestLatency;
+
+ @Metric(about = "Total count of full DB fetch requests made")
+ private MutableCounterLong fullDBFetchRequests;
+
+ @Metric(about = "Total size of downloaded snapshots in bytes")
+ private MutableCounterLong snapshotSizeBytes;
+
+ @Metric(about = "Count of successful snapshot downloads")
+ private MutableCounterLong snapshotDownloadSuccess;
+
+ @Metric(about = "Count of failed snapshot downloads")
+ private MutableCounterLong snapshotDownloadFailures;
+
+ private ReconSyncMetrics() {
+ }
+
+ public static ReconSyncMetrics create() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ return ms.register(SOURCE_NAME,
+ "Recon OM Sync Metrics",
+ new ReconSyncMetrics());
+ }
+
+ public void unRegister() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ ms.unregisterSource(SOURCE_NAME);
+ }
+
+ // Delta Fetch Operations
+ public void updateDeltaFetchDuration(long duration) {
+ this.deltaFetchDuration.add(duration);
+ }
+
+ public void incrDeltaFetchSuccess() {
+ this.deltaFetchSuccess.incr();
+ }
+
+ public void incrDeltaFetchFailures() {
+ this.deltaFetchFailures.incr();
+ }
+
+ public void incrDeltaDataFetchSize(long size) {
+ this.deltaDataFetchSize.incr(size);
+ }
+
+ // Delta Apply Operations
+ public void updateDeltaApplyDuration(long duration) {
+ this.deltaApplyDuration.add(duration);
+ }
+
+ public void incrDeltaApplyFailures() {
+ this.deltaApplyFailures.incr();
+ }
+
+ // Full DB Snapshot Operations
+ public void updateFullDBRequestLatency(long duration) {
+ this.fullDBRequestLatency.add(duration);
+ }
+
+ public void incrFullDBFetchRequests() {
+ this.fullDBFetchRequests.incr();
+ }
+
+ public void incrSnapshotSizeBytes(long size) {
+ this.snapshotSizeBytes.incr(size);
+ }
+
+ public void incrSnapshotDownloadSuccess() {
+ this.snapshotDownloadSuccess.incr();
+ }
+
+ public void incrSnapshotDownloadFailures() {
+ this.snapshotDownloadFailures.incr();
+ }
+
+ // Getters for testing
+ public long getDeltaFetchSuccess() {
+ return deltaFetchSuccess.value();
+ }
+
+ public long getDeltaFetchFailures() {
+ return deltaFetchFailures.value();
+ }
+
+ public long getDeltaDataFetchSize() {
+ return deltaDataFetchSize.value();
+ }
+
+ public long getDeltaApplyFailures() {
+ return deltaApplyFailures.value();
+ }
+
+ public MutableRate getDeltaFetchDuration() {
+ return deltaFetchDuration;
+ }
+
+ public MutableRate getDeltaApplyDuration() {
+ return deltaApplyDuration;
+ }
+
+ public long getFullDBFetchRequests() {
+ return fullDBFetchRequests.value();
+ }
+
+ public long getSnapshotSizeBytes() {
+ return snapshotSizeBytes.value();
+ }
+
+ public long getSnapshotDownloadSuccess() {
+ return snapshotDownloadSuccess.value();
+ }
+
+ public long getSnapshotDownloadFailures() {
+ return snapshotDownloadFailures.value();
+ }
+
+ public MutableRate getFullDBRequestLatency() {
+ return fullDBRequestLatency;
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconTaskControllerMetrics.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconTaskControllerMetrics.java
new file mode 100644
index 00000000000..8ab016b2485
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconTaskControllerMetrics.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.metrics;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+/**
+ * Metrics for Recon Task Controller operations.
+ * This class tracks queue management and system-wide reprocess operations.
+ */
[email protected]
+@Metrics(about = "Recon Task Controller Metrics", context = OzoneConsts.OZONE)
+public final class ReconTaskControllerMetrics {
+
+ private static final String SOURCE_NAME =
+ ReconTaskControllerMetrics.class.getSimpleName();
+
+ // Queue Management Metrics
+ @Metric(about = "Current number of Recon events including OM DB Update batch
events and Recon reinit events" +
+ " in the queue")
+ private MutableGaugeLong eventCurrentQueueSize;
+
+ @Metric(about = "Total count of OM DB Update events plus Recon reinit events
buffered since startup")
+ private MutableCounterLong eventBufferedCount;
+
+ @Metric(about = "Count of events dropped due to buffer issues")
+ private MutableCounterLong eventDropCount;
+
+ @Metric(about = "Total count of all Recon events processed")
+ private MutableCounterLong totalEventCount;
+
+ // System-Wide Reprocess Failure Categories
+ @Metric(about = "Count of checkpoint creation failures")
+ private MutableCounterLong reprocessCheckpointFailures;
+
+ @Metric(about = "Count of reprocess execution failures")
+ private MutableCounterLong reprocessExecutionFailures;
+
+ @Metric(about = "Count of stage database replacement failures")
+ private MutableCounterLong reprocessStageDatabaseFailures;
+
+ // System-Wide Reprocess Success and Submission Tracking
+ @Metric(about = "Count of all successful reprocess executions")
+ private MutableCounterLong reprocessSuccessCount;
+
+ @Metric(about = "Total count of reinitialization events submitted to queue")
+ private MutableCounterLong totalReprocessSubmittedToQueue;
+
+ private ReconTaskControllerMetrics() {
+ }
+
+ public static ReconTaskControllerMetrics create() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ return ms.register(SOURCE_NAME,
+ "Recon Task Controller Metrics",
+ new ReconTaskControllerMetrics());
+ }
+
+ public void unRegister() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ ms.unregisterSource(SOURCE_NAME);
+ }
+
+ // Queue Management Operations
+ public void setEventCurrentQueueSize(long size) {
+ this.eventCurrentQueueSize.set(size);
+ }
+
+ public void incrEventBufferedCount(long count) {
+ this.eventBufferedCount.incr(count);
+ }
+
+ public void incrEventDropCount(long count) {
+ this.eventDropCount.incr(count);
+ }
+
+ public void incrTotalEventCount(long count) {
+ this.totalEventCount.incr(count);
+ }
+
+ // Reprocess Failure Operations
+ public void incrReprocessCheckpointFailures() {
+ this.reprocessCheckpointFailures.incr();
+ }
+
+ public void incrReprocessExecutionFailures() {
+ this.reprocessExecutionFailures.incr();
+ }
+
+ public void incrReprocessStageDatabaseFailures() {
+ this.reprocessStageDatabaseFailures.incr();
+ }
+
+ // Reprocess Success Operations
+ public void incrReprocessSuccessCount() {
+ this.reprocessSuccessCount.incr();
+ }
+
+ public void incrTotalReprocessSubmittedToQueue() {
+ this.totalReprocessSubmittedToQueue.incr();
+ }
+
+ // Getters for testing
+ public long getEventCurrentQueueSize() {
+ return eventCurrentQueueSize.value();
+ }
+
+ public long getEventBufferedCount() {
+ return eventBufferedCount.value();
+ }
+
+ public long getEventDropCount() {
+ return eventDropCount.value();
+ }
+
+ public long getTotalEventCount() {
+ return totalEventCount.value();
+ }
+
+ public long getReprocessCheckpointFailures() {
+ return reprocessCheckpointFailures.value();
+ }
+
+ public long getReprocessExecutionFailures() {
+ return reprocessExecutionFailures.value();
+ }
+
+ public long getReprocessStageDatabaseFailures() {
+ return reprocessStageDatabaseFailures.value();
+ }
+
+ public long getReprocessSuccessCount() {
+ return reprocessSuccessCount.value();
+ }
+
+ public long getTotalReprocessSubmittedToQueue() {
+ return totalReprocessSubmittedToQueue.value();
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconTaskMetrics.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconTaskMetrics.java
new file mode 100644
index 00000000000..68eca5b3a9e
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconTaskMetrics.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.metrics;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+/**
+ * Per-task metrics for Recon task delta processing and reprocess operations.
+ * Provides granular visibility into individual task performance.
+ */
[email protected]
+@Metrics(about = "Recon Task Metrics", context = OzoneConsts.OZONE)
+public final class ReconTaskMetrics implements MetricsSource {
+
+ private static final String SOURCE_NAME =
+ ReconTaskMetrics.class.getSimpleName();
+
+ private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
+
+ // Static metric required for Hadoop Metrics framework registration
+ @Metric(about = "Total number of unique tasks tracked")
+ private MutableCounterLong numTasksTracked;
+
+ // Per-task delta processing metrics stored in ConcurrentMaps
+ private final ConcurrentMap<String, MutableCounterLong>
taskDeltaProcessingSuccess =
+ new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, MutableCounterLong>
taskDeltaProcessingFailures =
+ new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, MutableRate> taskDeltaProcessingDuration
=
+ new ConcurrentHashMap<>();
+
+ // Per-task reprocess metrics
+ private final ConcurrentMap<String, MutableCounterLong>
taskReprocessFailures =
+ new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, MutableRate> taskReprocessDuration =
+ new ConcurrentHashMap<>();
+
+ private ReconTaskMetrics() {
+ }
+
+ public static ReconTaskMetrics create() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ return ms.register(SOURCE_NAME,
+ "Recon Task Metrics",
+ new ReconTaskMetrics());
+ }
+
+ public void unRegister() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ ms.unregisterSource(SOURCE_NAME);
+ }
+
+ // Task Delta Processing Operations
+ public void incrTaskDeltaProcessingSuccess(String taskName) {
+ taskDeltaProcessingSuccess
+ .computeIfAbsent(taskName, k ->
+ registry.newCounter(
+ "TaskDeltaProcessingSuccess_" + sanitizeTaskName(taskName),
+ "Success count for task " + taskName,
+ 0L))
+ .incr();
+ }
+
+ public void incrTaskDeltaProcessingFailures(String taskName) {
+ taskDeltaProcessingFailures
+ .computeIfAbsent(taskName, k ->
+ registry.newCounter(
+ "TaskDeltaProcessingFailures_" + sanitizeTaskName(taskName),
+ "Failure count for task " + taskName,
+ 0L))
+ .incr();
+ }
+
+ public void updateTaskDeltaProcessingDuration(String taskName, long
duration) {
+ taskDeltaProcessingDuration
+ .computeIfAbsent(taskName, k ->
+ registry.newRate(
+ "TaskDeltaProcessingDuration_" + sanitizeTaskName(taskName),
+ "Processing duration for task " + taskName))
+ .add(duration);
+ }
+
+ // Task Reprocess Operations
+ public void incrTaskReprocessFailures(String taskName) {
+ taskReprocessFailures
+ .computeIfAbsent(taskName, k ->
+ registry.newCounter(
+ "TaskReprocessFailures_" + sanitizeTaskName(taskName),
+ "Reprocess failure count for task " + taskName,
+ 0L))
+ .incr();
+ }
+
+ public void updateTaskReprocessDuration(String taskName, long duration) {
+ taskReprocessDuration
+ .computeIfAbsent(taskName, k ->
+ registry.newRate(
+ "TaskReprocessDuration_" + sanitizeTaskName(taskName),
+ "Reprocess duration for task " + taskName))
+ .add(duration);
+ }
+
+ /**
+ * Sanitize task name for use in metric names.
+ * Replaces non-alphanumeric characters with underscores.
+ */
+ private String sanitizeTaskName(String taskName) {
+ return taskName.replaceAll("[^a-zA-Z0-9]", "_");
+ }
+
+ // Getters for testing
+ public long getTaskDeltaProcessingSuccess(String taskName) {
+ MutableCounterLong counter = taskDeltaProcessingSuccess.get(taskName);
+ return counter != null ? counter.value() : 0L;
+ }
+
+ public long getTaskDeltaProcessingFailures(String taskName) {
+ MutableCounterLong counter = taskDeltaProcessingFailures.get(taskName);
+ return counter != null ? counter.value() : 0L;
+ }
+
+ public MutableRate getTaskDeltaProcessingDuration(String taskName) {
+ return taskDeltaProcessingDuration.get(taskName);
+ }
+
+ public long getTaskReprocessFailures(String taskName) {
+ MutableCounterLong counter = taskReprocessFailures.get(taskName);
+ return counter != null ? counter.value() : 0L;
+ }
+
+ public MutableRate getTaskReprocessDuration(String taskName) {
+ return taskReprocessDuration.get(taskName);
+ }
+
+ @Override
+ public void getMetrics(MetricsCollector collector, boolean all) {
+ MetricsRecordBuilder recordBuilder = collector.addRecord(SOURCE_NAME);
+
+ // Snapshot static metric
+ numTasksTracked.snapshot(recordBuilder, all);
+
+ // Snapshot all dynamic per-task metrics
+ taskDeltaProcessingSuccess.values().forEach(
+ metric -> metric.snapshot(recordBuilder, all));
+ taskDeltaProcessingFailures.values().forEach(
+ metric -> metric.snapshot(recordBuilder, all));
+ taskDeltaProcessingDuration.values().forEach(
+ metric -> metric.snapshot(recordBuilder, all));
+ taskReprocessFailures.values().forEach(
+ metric -> metric.snapshot(recordBuilder, all));
+ taskReprocessDuration.values().forEach(
+ metric -> metric.snapshot(recordBuilder, all));
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
index cf3d03964af..9cea23da316 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
@@ -92,6 +92,7 @@
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.TarExtractor;
import org.apache.hadoop.ozone.recon.metrics.OzoneManagerSyncMetrics;
+import org.apache.hadoop.ozone.recon.metrics.ReconSyncMetrics;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
import org.apache.hadoop.ozone.recon.tasks.OMDBUpdatesHandler;
@@ -130,6 +131,7 @@ public class OzoneManagerServiceProviderImpl
private ReconTaskController reconTaskController;
private ReconUtils reconUtils;
private OzoneManagerSyncMetrics metrics;
+ private ReconSyncMetrics reconSyncMetrics;
private final long deltaUpdateLimit;
private final long omDBLagThreshold;
@@ -219,6 +221,7 @@ public OzoneManagerServiceProviderImpl(
this.ozoneManagerClient = ozoneManagerClient;
this.configuration = configuration;
this.metrics = OzoneManagerSyncMetrics.create();
+ this.reconSyncMetrics = ReconSyncMetrics.create();
this.deltaUpdateLimit = deltaUpdateLimits;
this.isSyncDataFromOMRunning = new AtomicBoolean();
this.threadNamePrefix =
@@ -374,6 +377,7 @@ public void stop() throws Exception {
scheduler.shutdownNow();
tarExtractor.stop();
metrics.unRegister();
+ reconSyncMetrics.unRegister();
connectionFactory.destroy();
}
@@ -492,32 +496,50 @@ connectionFactory, getOzoneManagerSnapshotUrl(),
isOmSpnegoEnabled()).getInputSt
boolean updateReconOmDBWithNewSnapshot() throws IOException {
// Check permissions of the Recon DB directory
checkAndValidateReconDbPermissions();
+
+ // Track full DB fetch request
+ reconSyncMetrics.incrFullDBFetchRequests();
+
// Obtain the current DB snapshot from OM and
// update the in house OM metadata managed DB instance.
long startTime = Time.monotonicNow();
DBCheckpoint dbSnapshot = getOzoneManagerDBSnapshot();
- metrics.updateSnapshotRequestLatency(Time.monotonicNow() - startTime);
+ long fullDBLatency = Time.monotonicNow() - startTime;
+
+ reconSyncMetrics.updateFullDBRequestLatency(fullDBLatency);
if (dbSnapshot == null) {
LOG.error("Failed to obtain a valid DB snapshot from Ozone Manager. This
could be due to " +
"missing SST files or other fetch issues.");
+ reconSyncMetrics.incrSnapshotDownloadFailures();
return false;
}
if (dbSnapshot.getCheckpointLocation() == null) {
LOG.error("Snapshot checkpoint location is null, indicating a failure to
properly fetch or " +
"store the snapshot.");
+ reconSyncMetrics.incrSnapshotDownloadFailures();
return false;
}
LOG.info("Attempting to update Recon OM DB with new snapshot located at:
{}",
dbSnapshot.getCheckpointLocation());
try {
-
omMetadataManager.updateOmDB(dbSnapshot.getCheckpointLocation().toFile(), true);
+ // Calculate snapshot size
+ File snapshotDir = dbSnapshot.getCheckpointLocation().toFile();
+ long snapshotSize = FileUtils.sizeOfDirectory(snapshotDir);
+ reconSyncMetrics.incrSnapshotSizeBytes(snapshotSize);
+
+ omMetadataManager.updateOmDB(snapshotDir, true);
+
+ // Track successful snapshot download
+ reconSyncMetrics.incrSnapshotDownloadSuccess();
+
LOG.info("Successfully updated Recon OM DB with new snapshot.");
return true;
} catch (IOException e) {
LOG.error("Unable to refresh Recon OM DB Snapshot.", e);
+ reconSyncMetrics.incrSnapshotDownloadFailures();
return false;
}
}
@@ -565,34 +587,70 @@ Long getAndApplyDeltaUpdatesFromOM(
ImmutablePair<Boolean, Long> innerGetAndApplyDeltaUpdatesFromOM(long
fromSequenceNumber,
OMDBUpdatesHandler omdbUpdatesHandler)
throws IOException, RocksDBException {
+ // Track delta fetch operation
+ long deltaFetchStartTime = Time.monotonicNow();
+
DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder()
.setSequenceNumber(fromSequenceNumber)
.setLimitCount(deltaUpdateLimit)
.build();
DBUpdates dbUpdates = ozoneManagerClient.getDBUpdates(dbUpdatesRequest);
+
+ // Update delta fetch duration
+ long deltaFetchDuration = Time.monotonicNow() - deltaFetchStartTime;
+ reconSyncMetrics.updateDeltaFetchDuration(deltaFetchDuration);
+
int numUpdates = 0;
long latestSequenceNumberOfOM = -1L;
if (null != dbUpdates && dbUpdates.getCurrentSequenceNumber() != -1) {
+ // Delta fetch succeeded
+ reconSyncMetrics.incrDeltaFetchSuccess();
+
latestSequenceNumberOfOM = dbUpdates.getLatestSequenceNumber();
RDBStore rocksDBStore = (RDBStore) omMetadataManager.getStore();
final RocksDatabase rocksDB = rocksDBStore.getDb();
numUpdates = dbUpdates.getData().size();
if (numUpdates > 0) {
metrics.incrNumUpdatesInDeltaTotal(numUpdates);
+
+ // Track delta data fetch size
+ long totalDataSize = 0;
+ for (byte[] data : dbUpdates.getData()) {
+ totalDataSize += data.length;
+ }
+ reconSyncMetrics.incrDeltaDataFetchSize(totalDataSize);
}
- for (byte[] data : dbUpdates.getData()) {
- try (ManagedWriteBatch writeBatch = new ManagedWriteBatch(data)) {
- // Events gets populated in events list in OMDBUpdatesHandler with
call back for put/delete/update
- writeBatch.iterate(omdbUpdatesHandler);
- // Commit the OM DB transactions in recon rocks DB and sync here.
- try (RDBBatchOperation rdbBatchOperation =
- new RDBBatchOperation(writeBatch)) {
- try (ManagedWriteOptions wOpts = new ManagedWriteOptions()) {
- rdbBatchOperation.commit(rocksDB, wOpts);
+
+ // Track delta apply (conversion + DB apply combined)
+ long deltaApplyStartTime = Time.monotonicNow();
+
+ try {
+ for (byte[] data : dbUpdates.getData()) {
+ try (ManagedWriteBatch writeBatch = new ManagedWriteBatch(data)) {
+ // Events gets populated in events list in OMDBUpdatesHandler with
call back for put/delete/update
+ writeBatch.iterate(omdbUpdatesHandler);
+ // Commit the OM DB transactions in recon rocks DB and sync here.
+ try (RDBBatchOperation rdbBatchOperation =
+ new RDBBatchOperation(writeBatch)) {
+ try (ManagedWriteOptions wOpts = new ManagedWriteOptions()) {
+ rdbBatchOperation.commit(rocksDB, wOpts);
+ }
}
}
}
+
+ // Update delta apply duration (successful)
+ long deltaApplyDuration = Time.monotonicNow() - deltaApplyStartTime;
+ reconSyncMetrics.updateDeltaApplyDuration(deltaApplyDuration);
+
+ } catch (RocksDBException | IOException e) {
+ // Track delta apply failures
+ reconSyncMetrics.incrDeltaApplyFailures();
+ throw e;
}
+ } else {
+ // Delta fetch failed
+ reconSyncMetrics.incrDeltaFetchFailures();
}
long lag = latestSequenceNumberOfOM == -1 ? 0 :
latestSequenceNumberOfOM - getCurrentOMDBSequenceNumber();
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBuffer.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBuffer.java
index e089b6c1ab6..9f6afa84105 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBuffer.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBuffer.java
@@ -21,6 +21,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.ozone.recon.metrics.ReconTaskControllerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,20 +32,22 @@
*/
public class OMUpdateEventBuffer {
private static final Logger LOG =
LoggerFactory.getLogger(OMUpdateEventBuffer.class);
-
+
private final BlockingQueue<ReconEvent> eventQueue;
private final int maxCapacity;
private final AtomicLong totalBufferedEvents = new AtomicLong(0);
private final AtomicLong droppedBatches = new AtomicLong(0);
-
- public OMUpdateEventBuffer(int maxCapacity) {
+ private final ReconTaskControllerMetrics metrics;
+
+ public OMUpdateEventBuffer(int maxCapacity, ReconTaskControllerMetrics
metrics) {
this.maxCapacity = maxCapacity;
this.eventQueue = new LinkedBlockingQueue<>(maxCapacity);
+ this.metrics = metrics;
}
/**
* Add an event to the buffer.
- *
+ *
* @param event The event to buffer
* @return true if successfully buffered, false if queue full
*/
@@ -52,10 +55,23 @@ public boolean offer(ReconEvent event) {
boolean added = eventQueue.offer(event);
if (added) {
totalBufferedEvents.addAndGet(event.getEventCount());
+
+ // Update metrics: track events buffered (entering queue)
+ if (metrics != null) {
+ metrics.incrEventBufferedCount(event.getEventCount());
+ metrics.setEventCurrentQueueSize(eventQueue.size());
+ }
+
LOG.debug("Buffered event {} with {} events. Queue size: {}, Total
buffered events: {}",
event.getEventType(), event.getEventCount(), eventQueue.size(),
totalBufferedEvents.get());
} else {
droppedBatches.incrementAndGet();
+
+ // Update metrics: track dropped events
+ if (metrics != null) {
+ metrics.incrEventDropCount(event.getEventCount());
+ }
+
LOG.warn("Event buffer queue is full (capacity: {}). Dropping event {}
with {} events. " +
"Total dropped batches: {}",
maxCapacity, event.getEventType(), event.getEventCount(),
droppedBatches.get());
@@ -65,7 +81,7 @@ public boolean offer(ReconEvent event) {
/**
* Poll an event from the buffer with timeout.
- *
+ *
* @param timeoutMs timeout in milliseconds
* @return event or null if timeout
*/
@@ -74,7 +90,14 @@ public ReconEvent poll(long timeoutMs) {
ReconEvent event = eventQueue.poll(timeoutMs,
java.util.concurrent.TimeUnit.MILLISECONDS);
if (event != null) {
totalBufferedEvents.addAndGet(-event.getEventCount());
- LOG.debug("Polled event {} with {} events. Queue size: {}, Total
buffered events: {}",
+
+ // Update metrics: track events processed (exiting queue)
+ if (metrics != null) {
+ metrics.incrTotalEventCount(event.getEventCount());
+ metrics.setEventCurrentQueueSize(eventQueue.size());
+ }
+
+ LOG.debug("Polled event {} with {} events. Queue size: {}, Total
buffered events: {}",
event.getEventType(), event.getEventCount(), eventQueue.size(),
totalBufferedEvents.get());
}
return event;
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
index 721cffc50fd..f9b3d40d118 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
@@ -50,6 +50,8 @@
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.recon.ReconConstants;
+import org.apache.hadoop.ozone.recon.metrics.ReconTaskControllerMetrics;
+import org.apache.hadoop.ozone.recon.metrics.ReconTaskMetrics;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconFileMetadataManager;
@@ -60,6 +62,7 @@
import org.apache.hadoop.ozone.recon.tasks.types.TaskExecutionException;
import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdater;
import
org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager;
+import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,7 +89,11 @@ public class ReconTaskControllerImpl implements
ReconTaskController {
private final AtomicBoolean tasksFailed = new AtomicBoolean(false);
private volatile ReconOMMetadataManager currentOMMetadataManager;
private final OzoneConfiguration configuration;
-
+
+ // Metrics
+ private final ReconTaskControllerMetrics controllerMetrics;
+ private final ReconTaskMetrics taskMetrics;
+
// Retry logic for event processing failures
private AtomicInteger eventProcessRetryCount = new AtomicInteger(0);
private AtomicLong lastRetryTimestamp = new AtomicLong(0);
@@ -113,9 +120,14 @@ public ReconTaskControllerImpl(OzoneConfiguration
configuration,
threadCount = configuration.getInt(OZONE_RECON_TASK_THREAD_COUNT_KEY,
OZONE_RECON_TASK_THREAD_COUNT_DEFAULT);
this.taskStatusUpdaterManager = taskStatusUpdaterManager;
+
+ // Initialize metrics
+ this.controllerMetrics = ReconTaskControllerMetrics.create();
+ this.taskMetrics = ReconTaskMetrics.create();
+
int eventBufferCapacity =
configuration.getInt(OZONE_RECON_OM_EVENT_BUFFER_CAPACITY,
OZONE_RECON_OM_EVENT_BUFFER_CAPACITY_DEFAULT);
- this.eventBuffer = new OMUpdateEventBuffer(eventBufferCapacity);
+ this.eventBuffer = new OMUpdateEventBuffer(eventBufferCapacity,
controllerMetrics);
for (ReconOmTask task : tasks) {
registerTask(task);
}
@@ -187,6 +199,10 @@ public synchronized boolean
reInitializeTasks(ReconOMMetadataManager omMetadataM
stagedReconDBProvider = reconDBProvider.getStagedReconDBProvider();
} catch (IOException e) {
LOG.error("Failed to get staged Recon DB provider for reinitialization
of tasks.", e);
+
+ // Track checkpoint creation failure
+ controllerMetrics.incrReprocessCheckpointFailures();
+
recordAllTaskStatus(localReconOmTaskMap, -1, -1);
return false;
}
@@ -202,32 +218,51 @@ public synchronized boolean
reInitializeTasks(ReconOMMetadataManager omMetadataM
AtomicBoolean isRunSuccessful = new AtomicBoolean(true);
try {
CompletableFuture.allOf(tasks.stream()
- .map(task -> CompletableFuture.supplyAsync(() -> {
- try {
- return task.call();
- } catch (Exception e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
+ .map(task -> {
+ // Track reprocess duration per task - start time recorded before
async execution
+ long reprocessStartTime = Time.monotonicNow();
+
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ ReconOmTask.TaskResult result = task.call();
+ return result;
+ } catch (Exception e) {
+ // Track reprocess failure per task
+ taskMetrics.incrTaskReprocessFailures(task.getTaskName());
+
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ // Wrap the exception with the task name
+ throw new TaskExecutionException(task.getTaskName(), e);
}
- // Wrap the exception with the task name
- throw new TaskExecutionException(task.getTaskName(), e);
- }
- }, executorService).thenAccept(result -> {
- if (!result.isTaskSuccess()) {
- String taskName = result.getTaskName();
- LOG.error("Init failed for task {}.", taskName);
+ }, executorService).thenAccept(result -> {
+ // Update reprocess duration after task completes (includes
queue time)
+ long reprocessDuration = Time.monotonicNow() -
reprocessStartTime;
+ taskMetrics.updateTaskReprocessDuration(task.getTaskName(),
reprocessDuration);
+
+ if (!result.isTaskSuccess()) {
+ String taskName = result.getTaskName();
+ LOG.error("Init failed for task {}.", taskName);
+
+ // Track reprocess failure per task
+ taskMetrics.incrTaskReprocessFailures(taskName);
+
+ isRunSuccessful.set(false);
+ }
+ }).exceptionally(ex -> {
+ LOG.error("Task failed with exception: ", ex);
isRunSuccessful.set(false);
- }
- }).exceptionally(ex -> {
- LOG.error("Task failed with exception: ", ex);
- isRunSuccessful.set(false);
- if (ex.getCause() instanceof TaskExecutionException) {
- TaskExecutionException taskEx = (TaskExecutionException)
ex.getCause();
- String taskName = taskEx.getTaskName();
- LOG.error("The above error occurred while trying to execute
task: {}", taskName);
- }
- return null;
- })).toArray(CompletableFuture[]::new)).join();
+ if (ex.getCause() instanceof TaskExecutionException) {
+ TaskExecutionException taskEx = (TaskExecutionException)
ex.getCause();
+ String taskName = taskEx.getTaskName();
+ // Track reprocess failure per task
+ taskMetrics.incrTaskReprocessFailures(taskName);
+ LOG.error("The above error occurred while trying to execute
task: {}", taskName);
+ }
+ return null;
+ });
+ }).toArray(CompletableFuture[]::new)).join();
} catch (CompletionException ce) {
LOG.error("Completing all tasks failed with exception ", ce);
isRunSuccessful.set(false);
@@ -244,9 +279,17 @@ public synchronized boolean
reInitializeTasks(ReconOMMetadataManager omMetadataM
reconGlobalStatsManager.reinitialize(reconDBProvider);
reconFileMetadataManager.reinitialize(reconDBProvider);
recordAllTaskStatus(localReconOmTaskMap, 0,
omMetadataManager.getLastSequenceNumberFromDB());
+
+ // Track reprocess success
+ controllerMetrics.incrReprocessSuccessCount();
+
LOG.info("Re-initialization of tasks completed successfully.");
} catch (Exception e) {
LOG.error("Re-initialization of tasks failed.", e);
+
+ // Track stage database failure
+ controllerMetrics.incrReprocessStageDatabaseFailures();
+
recordAllTaskStatus(localReconOmTaskMap, -1, -1);
// reinitialize the Recon OM tasks with the original DB provider
try {
@@ -260,6 +303,10 @@ public synchronized boolean
reInitializeTasks(ReconOMMetadataManager omMetadataM
}
} else {
LOG.error("Re-initialization of tasks failed.");
+
+ // Track reprocess execution failure
+ controllerMetrics.incrReprocessExecutionFailures();
+
try {
stagedReconDBProvider.close();
} catch (Exception e) {
@@ -336,8 +383,17 @@ private void processTasks(
OMUpdateEventBatch events, List<ReconOmTask.TaskResult> failedTasks) {
List<CompletableFuture<Void>> futures = tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> {
+ // Track task delta processing duration
+ long taskStartTime = Time.monotonicNow();
+
try {
- return task.call();
+ ReconOmTask.TaskResult result = task.call();
+
+ // Update task delta processing duration
+ long taskDuration = Time.monotonicNow() - taskStartTime;
+ taskMetrics.updateTaskDeltaProcessingDuration(task.getTaskName(),
taskDuration);
+
+ return result;
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
@@ -351,12 +407,19 @@ private void processTasks(
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
if (!result.isTaskSuccess()) {
LOG.error("Task {} failed", taskName);
+
+ // Track task delta processing failure
+ taskMetrics.incrTaskDeltaProcessingFailures(taskName);
+
failedTasks.add(new ReconOmTask.TaskResult.Builder()
.setTaskName(taskName)
.setSubTaskSeekPositions(result.getSubTaskSeekPositions())
.build());
taskStatusUpdater.setLastTaskRunStatus(-1);
} else {
+ // Track task delta processing success
+ taskMetrics.incrTaskDeltaProcessingSuccess(taskName);
+
taskStatusUpdater.setLastTaskRunStatus(0);
taskStatusUpdater.setLastUpdatedSeqNumber(events.getLastSequenceNumber());
}
@@ -368,6 +431,9 @@ private void processTasks(
String taskName = taskEx.getTaskName();
LOG.error("The above error occurred while trying to execute task:
{}", taskName);
+ // Track task delta processing failure
+ taskMetrics.incrTaskDeltaProcessingFailures(taskName);
+
ReconTaskStatusUpdater taskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
taskStatusUpdater.setLastTaskRunStatus(-1);
@@ -498,6 +564,9 @@ public synchronized
ReconTaskController.ReInitializationResult queueReInitializa
LOG.info("Queueing task reinitialization event due to: {} (retry attempt
count: {})", reason,
eventProcessRetryCount.get());
+ // Track reprocess submission
+ controllerMetrics.incrTotalReprocessSubmittedToQueue();
+
ReInitializationResult reInitializationResult =
validateRetryCountAndDelay();
if (null != reInitializationResult) {
return reInitializationResult;
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMUpdateEventBuffer.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMUpdateEventBuffer.java
index 2460cdcd0e5..a73f65721ad 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMUpdateEventBuffer.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMUpdateEventBuffer.java
@@ -37,7 +37,7 @@ public class TestOMUpdateEventBuffer {
@BeforeEach
void setUp() {
- eventBuffer = new OMUpdateEventBuffer(TEST_CAPACITY);
+ eventBuffer = new OMUpdateEventBuffer(TEST_CAPACITY, null);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]