This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ffc6c83f12a IGNITE-27359 Add aipersist runConsistently metrics (#7422)
ffc6c83f12a is described below

commit ffc6c83f12a7c8d02e21d281a07f36a50af577ee
Author: Viacheslav Blinov <[email protected]>
AuthorDate: Fri Feb 27 14:04:16 2026 +0300

    IGNITE-27359 Add aipersist runConsistently metrics (#7422)
---
 .../ignite/internal/metrics/MetricMatchers.java    |  93 +++++++++++++++
 .../checkpoint/CheckpointTimeoutLockTest.java      |  34 +++---
 .../replacement/AbstractPageReplacementTest.java   |  11 +-
 .../persistence/store/FilePageStoreIoTest.java     |  41 +++----
 .../PersistentPageMemoryStorageEngine.java         |  12 +-
 .../PersistentPageMemoryStorageMetricSource.java   |  85 --------------
 .../PersistentPageMemoryStorageMetrics.java        |   3 +-
 .../PersistentPageMemoryTableStorage.java          |  12 +-
 .../mv/PersistentPageMemoryMvPartitionStorage.java |  56 +++++++--
 .../pagememory/mv/RunConsistentlyMetrics.java      | 125 +++++++++++++++++++++
 .../PersistentPageMemoryStorageMetricsTest.java    |   3 +-
 .../pagememory/mv/FailedCheckpointTest.java        |   9 +-
 ...PersistentPageMemoryMvPartitionStorageTest.java |  74 +++++++++++-
 13 files changed, 395 insertions(+), 163 deletions(-)

diff --git 
a/modules/metrics/src/testFixtures/java/org/apache/ignite/internal/metrics/MetricMatchers.java
 
b/modules/metrics/src/testFixtures/java/org/apache/ignite/internal/metrics/MetricMatchers.java
new file mode 100644
index 00000000000..f8a20843329
--- /dev/null
+++ 
b/modules/metrics/src/testFixtures/java/org/apache/ignite/internal/metrics/MetricMatchers.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics;
+
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.Arrays;
+import org.hamcrest.FeatureMatcher;
+import org.hamcrest.Matcher;
+
+/**
+ * Hamcrest matchers for testing metrics.
+ */
+public final class MetricMatchers {
+    private MetricMatchers() {
+        // No-op.
+    }
+
+    /**
+     * Creates a matcher that matches a {@link DistributionMetric} with the 
expected total number of measurements
+     * across all histogram buckets.
+     *
+     * @param expectedMeasuresCount Expected total number of measurements 
across all buckets.
+     * @return Matcher for distribution metric measures count.
+     */
+    public static Matcher<DistributionMetric> hasMeasurementsCount(long 
expectedMeasuresCount) {
+        return new FeatureMatcher<>(
+                is(expectedMeasuresCount),
+                "a DistributionMetric with measures count",
+                "measures count") {
+            @Override
+            protected Long featureValueOf(DistributionMetric metric) {
+                return Arrays.stream(metric.value()).sum();
+            }
+        };
+    }
+
+    /**
+     * Creates a matcher that matches a {@link LongMetric} whose value 
satisfies the given matcher.
+     *
+     * @param valueMatcher Matcher for the metric value.
+     * @return Matcher for long metric value.
+     */
+    public static Matcher<LongMetric> hasValue(Matcher<Long> valueMatcher) {
+        return new FeatureMatcher<>(
+                valueMatcher,
+                "a LongMetric with value",
+                "value") {
+            @Override
+            protected Long featureValueOf(LongMetric metric) {
+                return metric.value();
+            }
+        };
+    }
+
+    /**
+     * Creates a matcher that matches a {@link MetricSet} containing a metric 
with the given name
+     * that satisfies the given matcher.
+     *
+     * @param name Name of the metric to look up in the metric set.
+     * @param metricMatcher Matcher for the metric.
+     * @param <M> Type of the metric.
+     * @return Matcher for metric set containing the specified metric.
+     */
+    public static <M extends Metric> Matcher<MetricSet> hasMetric(String name, 
Matcher<M> metricMatcher) {
+        return new FeatureMatcher<>(
+                allOf(notNullValue(), metricMatcher),
+                "a MetricSet with metric named \"" + name + "\"",
+                "metric named \"" + name + "\"") {
+            @Override
+            protected M featureValueOf(MetricSet metricSet) {
+                return metricSet.get(name);
+            }
+        };
+    }
+}
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java
index 9aafb84b044..e151550b2c2 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java
@@ -21,6 +21,7 @@ import static java.lang.Thread.currentThread;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.metrics.MetricMatchers.hasMeasurementsCount;
 import static 
org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.MUST_TRIGGER;
 import static 
org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.NOT_REQUIRED;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
@@ -42,7 +43,6 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import java.util.Arrays;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -53,7 +53,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
-import org.apache.ignite.internal.metrics.DistributionMetric;
 import org.apache.ignite.internal.pagememory.metrics.CollectionMetricSource;
 import org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -438,17 +437,26 @@ public class CheckpointTimeoutLockTest extends 
BaseIgniteAbstractTest {
 
         try {
             // Verify metrics start at zero
-            
assertDistributionMetricRecordsCount(metrics.readLockAcquisitionTime(), 0L);
+            assertThat(
+                    metrics.readLockAcquisitionTime(),
+                    hasMeasurementsCount(0L)
+            );
 
             // Acquire and immediately release the lock
             timeoutLock.checkpointReadLock();
             timeoutLock.checkpointReadUnlock();
 
             // Verify acquisition was recorded
-            
assertDistributionMetricRecordsCount(metrics.readLockAcquisitionTime(), 1L);
+            assertThat(
+                    metrics.readLockAcquisitionTime(),
+                    hasMeasurementsCount(1L)
+            );
 
             // Verify hold time distribution was recorded
-            assertDistributionMetricRecordsCount(metrics.readLockHoldTime(), 
1L);
+            assertThat(
+                    metrics.readLockHoldTime(),
+                    hasMeasurementsCount(1L)
+            );
 
             readWriteLock.writeLock();
             runAsync(() -> {
@@ -462,20 +470,4 @@ public class CheckpointTimeoutLockTest extends 
BaseIgniteAbstractTest {
             timeoutLock.stop();
         }
     }
-
-    /**
-     * Verifies that the specified distribution metric has recorded the 
expected total number of measurements.
-     *
-     * <p>
-     * Rather than checking individual histogram buckets, this method 
aggregates all recorded measurements across every bucket
-     * and confirms that the expected interaction was captured in at least one 
of them.
-     */
-    private static void 
assertDistributionMetricRecordsCount(DistributionMetric metric, long 
expectedMeasuresCount) {
-        long totalMeasuresCount = Arrays.stream(metric.value()).sum();
-        assertThat(
-                "Unexpected total measures count in distribution metric " + 
metric.name(),
-                totalMeasuresCount,
-                is(expectedMeasuresCount)
-        );
-    }
 }
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
index 723871f0586..76525174066 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.pagememory.persistence.replacement;
 
+import static org.apache.ignite.internal.metrics.MetricMatchers.hasMetric;
+import static org.apache.ignite.internal.metrics.MetricMatchers.hasValue;
 import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
 import static 
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemoryMetricSource.DIRTY_PAGES;
 import static 
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemoryMetricSource.LOADED_PAGES;
@@ -36,7 +38,6 @@ import static 
org.apache.ignite.internal.util.GridUnsafe.freeBuffer;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -60,7 +61,6 @@ import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory;
 import org.apache.ignite.internal.lang.RunnableX;
-import org.apache.ignite.internal.metrics.LongMetric;
 import org.apache.ignite.internal.metrics.MetricSet;
 import org.apache.ignite.internal.pagememory.DataRegion;
 import org.apache.ignite.internal.pagememory.TestDataRegion;
@@ -374,12 +374,7 @@ public abstract class AbstractPageReplacementTest extends 
IgniteAbstractTest {
     }
 
     private void assertMetricValue(String metricName, Matcher<Long> 
valueMatcher) {
-        LongMetric metric = metricSet.get(metricName);
-        assertThat(metric, is(notNullValue()));
-        assertThat(
-                metric.value(),
-                valueMatcher
-        );
+        assertThat(metricSet, hasMetric(metricName, hasValue(valueMatcher)));
     }
 
     private void createAndFillTestSimpleValuePage(long pageId) throws 
Exception {
diff --git 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIoTest.java
 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIoTest.java
index dcfcd8b8050..35b4c7de798 100644
--- 
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIoTest.java
+++ 
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIoTest.java
@@ -20,6 +20,9 @@ package 
org.apache.ignite.internal.pagememory.persistence.store;
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.READ;
 import static java.nio.file.StandardOpenOption.WRITE;
+import static 
org.apache.ignite.internal.metrics.MetricMatchers.hasMeasurementsCount;
+import static org.apache.ignite.internal.metrics.MetricMatchers.hasMetric;
+import static org.apache.ignite.internal.metrics.MetricMatchers.hasValue;
 import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
 import static 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStore.VERSION_1;
 import static 
org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.createPageByteBuffer;
@@ -29,20 +32,16 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
-import java.util.Arrays;
 import org.apache.ignite.internal.fileio.FileIo;
 import org.apache.ignite.internal.fileio.FileIoFactory;
 import org.apache.ignite.internal.fileio.MeteredFileIoFactory;
 import org.apache.ignite.internal.fileio.RandomAccessFileIo;
 import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory;
-import org.apache.ignite.internal.metrics.DistributionMetric;
-import org.apache.ignite.internal.metrics.LongMetric;
 import org.apache.ignite.internal.metrics.MetricSet;
 import org.apache.ignite.internal.pagememory.metrics.CollectionMetricSource;
 import org.apache.ignite.internal.pagememory.persistence.PageMemoryIoMetrics;
@@ -145,7 +144,7 @@ public class FilePageStoreIoTest extends 
AbstractFilePageStoreIoTest {
 
             // Verify write metrics were recorded - 1 write of header + 1 
write of page
             assertMetricValue(metricSet, 
PageMemoryIoMetrics.TOTAL_BYTES_WRITTEN,  PAGE_SIZE * 2);
-            assertDistributionMetricRecordsCount(metricSet, 
PageMemoryIoMetrics.WRITES_TIME, 2L);
+            assertDistributionMetricFromSet(metricSet, 
PageMemoryIoMetrics.WRITES_TIME, 2L);
 
             // Perform read operation
             long pageOff = filePageStoreIo.pageOffset(pageId);
@@ -154,7 +153,7 @@ public class FilePageStoreIoTest extends 
AbstractFilePageStoreIoTest {
 
             // Verify read metrics were recorded
             assertMetricValue(metricSet, PageMemoryIoMetrics.TOTAL_BYTES_READ, 
 PAGE_SIZE);
-            assertDistributionMetricRecordsCount(metricSet, 
PageMemoryIoMetrics.READS_TIME, 1L);
+            assertDistributionMetricFromSet(metricSet, 
PageMemoryIoMetrics.READS_TIME, 1L);
         }
     }
 
@@ -176,28 +175,16 @@ public class FilePageStoreIoTest extends 
AbstractFilePageStoreIoTest {
     }
 
     private static void assertMetricValue(MetricSet metrics, String 
metricName, long value) {
-        LongMetric metric = metrics.get(metricName);
-
-        assertNotNull(metric, "Metric not found: " + metricName);
-        assertEquals(value, metric.value(), metricName);
+        assertThat(metrics, hasMetric(
+                metricName,
+                hasValue(is(value))
+        ));
     }
 
-    /**
-     * Verifies that the specified distribution metric has recorded the 
expected total number of measurements.
-     *
-     * <p>
-     * Rather than checking individual histogram buckets, this method 
aggregates all recorded measurements across every bucket
-     * and confirms that the expected interaction was captured in at least one 
of them.
-     */
-    private static void assertDistributionMetricRecordsCount(MetricSet 
metrics, String metricName, long expectedMeasuresCount) {
-        DistributionMetric metric = metrics.get(metricName);
-        assertNotNull(metric, metricName);
-
-        long totalMeasuresCount = Arrays.stream(metric.value()).sum();
-        assertThat(
-                "Unexpected total measures count in distribution metric " + 
metric.name(),
-                totalMeasuresCount,
-                is(expectedMeasuresCount)
-        );
+    private static void assertDistributionMetricFromSet(MetricSet metrics, 
String metricName, long expectedMeasuresCount) {
+        assertThat(metrics, hasMetric(
+                metricName,
+                hasMeasurementsCount(expectedMeasuresCount)
+        ));
     }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
index 88431a0cda9..15dd79642d7 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
@@ -64,6 +64,7 @@ import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.Persis
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryProfileView;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineExtensionConfiguration;
+import org.apache.ignite.internal.storage.pagememory.mv.RunConsistentlyMetrics;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.jetbrains.annotations.Nullable;
 
@@ -99,7 +100,7 @@ public class PersistentPageMemoryStorageEngine extends 
AbstractPageMemoryStorage
 
     private CollectionMetricSource checkpointMetricSource;
 
-    private PersistentPageMemoryStorageMetricSource storageMetricSource;
+    private CollectionMetricSource storageMetricSource;
 
     private final StorageConfiguration storageConfig;
 
@@ -130,6 +131,8 @@ public class PersistentPageMemoryStorageEngine extends 
AbstractPageMemoryStorage
     /** For unspecified tasks, i.e. throttling log. */
     private final ExecutorService commonExecutorService;
 
+    private RunConsistentlyMetrics runConsistentlyMetrics;
+
     /**
      * Constructor.
      *
@@ -253,10 +256,12 @@ public class PersistentPageMemoryStorageEngine extends 
AbstractPageMemoryStorage
 
         destructionExecutor = executor;
 
-        storageMetricSource = new 
PersistentPageMemoryStorageMetricSource("storage." + ENGINE_NAME);
+        storageMetricSource = new CollectionMetricSource("storage." + 
ENGINE_NAME, "storage", null);
 
         PersistentPageMemoryStorageMetrics.initMetrics(storageMetricSource, 
filePageStoreManager);
 
+        runConsistentlyMetrics = new 
RunConsistentlyMetrics(storageMetricSource);
+
         metricManager.registerSource(checkpointMetricSource);
         metricManager.registerSource(storageMetricSource);
         metricManager.registerSource(ioMetricSource);
@@ -332,7 +337,8 @@ public class PersistentPageMemoryStorageEngine extends 
AbstractPageMemoryStorage
                 this,
                 dataRegion,
                 destructionExecutor,
-                failureManager
+                failureManager,
+                runConsistentlyMetrics
         );
 
         dataRegion.addTableStorage(tableStorage);
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetricSource.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetricSource.java
deleted file mode 100644
index 0d813d0af98..00000000000
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetricSource.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.internal.metrics.Metric;
-import org.apache.ignite.internal.metrics.MetricSet;
-import org.apache.ignite.internal.metrics.MetricSource;
-import org.jetbrains.annotations.Nullable;
-
-/** Persistent page memory storage metric source. */
-class PersistentPageMemoryStorageMetricSource implements MetricSource {
-    private final String name;
-
-    /** Metrics map. Only modified in {@code synchronized} context. */
-    private final Map<String, Metric> metrics = new HashMap<>();
-
-    /** Enabled flag. Only modified in {@code synchronized} context. */
-    private boolean enabled;
-
-    /**
-     * Constructor.
-     *
-     * @param name Metric set name.
-     */
-    PersistentPageMemoryStorageMetricSource(String name) {
-        this.name = name;
-    }
-
-    @Override
-    public String name() {
-        return name;
-    }
-
-    @Override
-    public @Nullable String group() {
-        return "storage";
-    }
-
-    /** Adds metric to the source. */
-    synchronized <T extends Metric> T addMetric(T metric) {
-        assert !enabled : "Cannot add metrics when source is enabled";
-
-        metrics.put(metric.name(), metric);
-
-        return metric;
-    }
-
-    @Override
-    public synchronized @Nullable MetricSet enable() {
-        if (enabled) {
-            return null;
-        }
-
-        enabled = true;
-
-        return new MetricSet(name, description(), group(), 
Map.copyOf(metrics));
-    }
-
-    @Override
-    public synchronized void disable() {
-        enabled = false;
-    }
-
-    @Override
-    public synchronized boolean enabled() {
-        return enabled;
-    }
-}
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetrics.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetrics.java
index 13a6bfbe31a..6a8d847b476 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetrics.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetrics.java
@@ -21,6 +21,7 @@ import 
org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metrics.LongGauge;
+import org.apache.ignite.internal.pagememory.metrics.CollectionMetricSource;
 import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
 import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
 import 
org.apache.ignite.internal.pagememory.persistence.store.GroupPageStoresMap.GroupPartitionPageStore;
@@ -31,7 +32,7 @@ class PersistentPageMemoryStorageMetrics {
 
     /** Initializes metrics in the given metric source. */
     static void initMetrics(
-            PersistentPageMemoryStorageMetricSource source,
+            CollectionMetricSource source,
             FilePageStoreManager filePageStoreManager
     ) {
         source.addMetric(new LongGauge(
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 7385839d8c7..426aecca791 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -47,6 +47,7 @@ import 
org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
 import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
 import 
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage;
 import 
org.apache.ignite.internal.storage.pagememory.mv.PersistentPageMemoryMvPartitionStorage;
+import org.apache.ignite.internal.storage.pagememory.mv.RunConsistentlyMetrics;
 import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
 import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
 import org.jetbrains.annotations.Nullable;
@@ -65,6 +66,8 @@ public class PersistentPageMemoryTableStorage extends 
AbstractPageMemoryTableSto
 
     private final FailureProcessor failureProcessor;
 
+    private final RunConsistentlyMetrics runConsistentlyMetrics;
+
     /**
      * Constructor.
      *
@@ -72,7 +75,9 @@ public class PersistentPageMemoryTableStorage extends 
AbstractPageMemoryTableSto
      * @param indexDescriptorSupplier Index descriptor supplier.
      * @param engine Storage engine instance.
      * @param dataRegion Data region for the table.
+     * @param destructionExecutor Executor service for destruction tasks.
      * @param failureProcessor Failure processor.
+     * @param runConsistentlyMetrics RunConsistently metrics.
      */
     public PersistentPageMemoryTableStorage(
             StorageTableDescriptor tableDescriptor,
@@ -80,7 +85,8 @@ public class PersistentPageMemoryTableStorage extends 
AbstractPageMemoryTableSto
             PersistentPageMemoryStorageEngine engine,
             PersistentPageMemoryDataRegion dataRegion,
             ExecutorService destructionExecutor,
-            FailureProcessor failureProcessor
+            FailureProcessor failureProcessor,
+            RunConsistentlyMetrics runConsistentlyMetrics
     ) {
         super(tableDescriptor, indexDescriptorSupplier);
 
@@ -88,6 +94,7 @@ public class PersistentPageMemoryTableStorage extends 
AbstractPageMemoryTableSto
         this.dataRegion = dataRegion;
         this.destructionExecutor = destructionExecutor;
         this.failureProcessor = failureProcessor;
+        this.runConsistentlyMetrics = runConsistentlyMetrics;
     }
 
     @Override
@@ -144,7 +151,8 @@ public class PersistentPageMemoryTableStorage extends 
AbstractPageMemoryTableSto
                     indexMetaTree,
                     gcQueue,
                     destructionExecutor,
-                    failureProcessor
+                    failureProcessor,
+                    runConsistentlyMetrics
             );
         });
     }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index 8e46826f286..8b10f12744b 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.storage.util.LocalLocker;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Implementation of {@link MvPartitionStorage} based on a {@link BplusTree} 
for persistent case.
@@ -99,6 +100,9 @@ public class PersistentPageMemoryMvPartitionStorage extends 
AbstractPageMemoryMv
      */
     private final Object leaseInfoLock = new Object();
 
+    /** RunConsistently metrics. */
+    private final RunConsistentlyMetrics runConsistentlyMetrics;
+
     /**
      * Constructor.
      *
@@ -110,6 +114,7 @@ public class PersistentPageMemoryMvPartitionStorage extends 
AbstractPageMemoryMv
      * @param indexMetaTree Tree that contains SQL indexes' metadata.
      * @param gcQueue Garbage collection queue.
      * @param failureProcessor Failure processor.
+     * @param runConsistentlyMetrics Metric source for runConsistently 
operations.
      */
     public PersistentPageMemoryMvPartitionStorage(
             PersistentPageMemoryTableStorage tableStorage,
@@ -120,7 +125,8 @@ public class PersistentPageMemoryMvPartitionStorage extends 
AbstractPageMemoryMv
             IndexMetaTree indexMetaTree,
             GcQueue gcQueue,
             ExecutorService destructionExecutor,
-            FailureProcessor failureProcessor
+            FailureProcessor failureProcessor,
+            RunConsistentlyMetrics runConsistentlyMetrics
     ) {
         super(
                 partitionId,
@@ -166,6 +172,8 @@ public class PersistentPageMemoryMvPartitionStorage extends 
AbstractPageMemoryMv
         );
 
         leaseInfo = leaseInfoFromMeta();
+
+        this.runConsistentlyMetrics = runConsistentlyMetrics;
     }
 
     /**
@@ -193,26 +201,44 @@ public class PersistentPageMemoryMvPartitionStorage 
extends AbstractPageMemoryMv
             return busy(() -> {
                 
throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), 
this::createStorageInfo);
 
-                LocalLocker locker0 = new PersistentPageMemoryLocker();
-
-                checkpointTimeoutLock.checkpointReadLock();
+                boolean metricsEnabled = runConsistentlyMetrics.enabled();
+                long startTime = metricsEnabled ? System.nanoTime() : 0;
 
-                THREAD_LOCAL_LOCKER.set(locker0);
+                if (metricsEnabled) {
+                    runConsistentlyMetrics.onRunConsistentlyStarted();
+                }
 
                 try {
-                    return closure.execute(locker0);
+                    return executeRunConsistently(closure);
                 } finally {
-                    THREAD_LOCAL_LOCKER.set(null);
-
-                    // Can't throw any exception, it's safe to do it without 
try/finally.
-                    locker0.unlockAll();
-
-                    checkpointTimeoutLock.checkpointReadUnlock();
+                    if (metricsEnabled) {
+                        
runConsistentlyMetrics.recordRunConsistentlyDuration(System.nanoTime() - 
startTime);
+                        runConsistentlyMetrics.onRunConsistentlyFinished();
+                    }
                 }
             });
         }
     }
 
+    private <V> V executeRunConsistently(WriteClosure<V> closure) {
+        LocalLocker locker0 = new PersistentPageMemoryLocker();
+
+        checkpointTimeoutLock.checkpointReadLock();
+
+        THREAD_LOCAL_LOCKER.set(locker0);
+
+        try {
+            return closure.execute(locker0);
+        } finally {
+            THREAD_LOCAL_LOCKER.set(null);
+
+            // Can't throw any exception, it's safe to do it without 
try/finally.
+            locker0.unlockAll();
+
+            checkpointTimeoutLock.checkpointReadUnlock();
+        }
+    }
+
     @Override
     public CompletableFuture<Void> flush(boolean trigger) {
         return busy(() -> {
@@ -437,6 +463,12 @@ public class PersistentPageMemoryMvPartitionStorage 
extends AbstractPageMemoryMv
         return wiHeadLock.isHeldByCurrentThread();
     }
 
+    /** Returns the runConsistently metrics for testing. */
+    @TestOnly
+    RunConsistentlyMetrics runConsistentlyMetrics() {
+        return runConsistentlyMetrics;
+    }
+
     @Override
     public @Nullable LeaseInfo leaseInfo() {
         return busy(() -> {
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RunConsistentlyMetrics.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RunConsistentlyMetrics.java
new file mode 100644
index 00000000000..6cc8fce77f6
--- /dev/null
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RunConsistentlyMetrics.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.ignite.internal.metrics.DistributionMetric;
+import org.apache.ignite.internal.metrics.LongAdderMetric;
+import org.apache.ignite.internal.metrics.LongGauge;
+import org.apache.ignite.internal.metrics.LongMetric;
+import org.apache.ignite.internal.pagememory.metrics.CollectionMetricSource;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Metrics for runConsistently operation.
+ *
+ * <p>Tracks runConsistently closure execution performance including duration,
+ * active call count, and total invocation count.
+ */
+public class RunConsistentlyMetrics {
+    /** Histogram bucket bounds for runConsistently duration in nanoseconds. */
+    private static final long[] RUN_CONSISTENTLY_DURATION_BOUNDS = {
+            TimeUnit.MICROSECONDS.toNanos(10),
+            TimeUnit.MICROSECONDS.toNanos(100),
+            TimeUnit.MILLISECONDS.toNanos(1),
+            TimeUnit.MILLISECONDS.toNanos(10),
+            TimeUnit.MILLISECONDS.toNanos(100),
+            TimeUnit.SECONDS.toNanos(1),
+            TimeUnit.SECONDS.toNanos(10),
+    };
+
+    private final DistributionMetric runConsistentlyDuration;
+    private final LongAdderMetric runConsistentlyStarted;
+    private final LongAdder exitCount = new LongAdder();
+    private final LongGauge runConsistentlyActiveCount;
+
+    private final CollectionMetricSource metricSource;
+
+    /**
+     * Constructor.
+     *
+     * @param metricSource Metric source to register metrics with.
+     */
+    public RunConsistentlyMetrics(CollectionMetricSource metricSource) {
+        this.metricSource = metricSource;
+
+        runConsistentlyDuration = metricSource.addMetric(new 
DistributionMetric(
+                "RunConsistentlyDuration",
+                "Time spent in runConsistently closures in nanoseconds.",
+                RUN_CONSISTENTLY_DURATION_BOUNDS
+        ));
+
+        runConsistentlyStarted = metricSource.addMetric(new LongAdderMetric(
+                "RunConsistentlyStarted",
+                "Total number of runConsistently invocations started."
+        ));
+
+        runConsistentlyActiveCount = metricSource.addMetric(new LongGauge(
+                "RunConsistentlyActiveCount",
+                "Current number of active runConsistently calls.",
+                () -> runConsistentlyStarted.value() - exitCount.sum()
+        ));
+    }
+
+    /**
+     * Returns {@code true} if the metric source is enabled.
+     */
+    public boolean enabled() {
+        return metricSource.enabled();
+    }
+
+    /**
+     * Records the duration of a runConsistently closure execution in 
nanoseconds.
+     */
+    public void recordRunConsistentlyDuration(long durationNanos) {
+        runConsistentlyDuration.add(durationNanos);
+    }
+
+    /**
+     * Records the start of a runConsistently invocation.
+     */
+    public void onRunConsistentlyStarted() {
+        runConsistentlyStarted.increment();
+    }
+
+    /**
+     * Records the completion of a runConsistently invocation.
+     */
+    public void onRunConsistentlyFinished() {
+        exitCount.increment();
+    }
+
+    /** Returns the runConsistently duration metric for testing. */
+    @TestOnly
+    DistributionMetric runConsistentlyDuration() {
+        return runConsistentlyDuration;
+    }
+
+    /** Returns the runConsistently started count metric for testing. */
+    @TestOnly
+    LongAdderMetric runConsistentlyStarted() {
+        return runConsistentlyStarted;
+    }
+
+    /** Returns the runConsistently active count metric for testing. */
+    @TestOnly
+    LongMetric runConsistentlyActiveCount() {
+        return runConsistentlyActiveCount;
+    }
+}
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetricsTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetricsTest.java
index 434dce47e91..411c615f919 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetricsTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetricsTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.metrics.Metric;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.metrics.MetricSet;
 import org.apache.ignite.internal.metrics.TestMetricManager;
+import org.apache.ignite.internal.pagememory.metrics.CollectionMetricSource;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
 import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
@@ -45,7 +46,7 @@ import org.junit.jupiter.api.Test;
 public class PersistentPageMemoryStorageMetricsTest extends 
BaseIgniteAbstractTest {
     private final MetricManager metricManager = new TestMetricManager();
 
-    private final PersistentPageMemoryStorageMetricSource metricSource = new 
PersistentPageMemoryStorageMetricSource("test");
+    private final CollectionMetricSource metricSource = new 
CollectionMetricSource("test", "storage", null);
 
     private final FilePageStoreManager filePageStoreManager = 
mock(FilePageStoreManager.class);
 
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/FailedCheckpointTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/FailedCheckpointTest.java
index 9a6cbe78c49..9513fbe8266 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/FailedCheckpointTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/FailedCheckpointTest.java
@@ -232,13 +232,20 @@ public class FailedCheckpointTest extends 
BaseMvStoragesTest {
     }
 
     private MvTableStorage createMvTableStorage() {
+        CollectionMetricSource metricSource = new CollectionMetricSource(
+                "storage.test.consistency",
+                "storage",
+                null
+        );
+
         var tableStorage = new PersistentPageMemoryTableStorage(
                 new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_STORAGE_PROFILE),
                 mock(StorageIndexDescriptorSupplier.class),
                 mockEngine,
                 dataRegion,
                 destructionExecutor,
-                mock(FailureManager.class)
+                mock(FailureManager.class),
+                new RunConsistentlyMetrics(metricSource)
         );
 
         dataRegion.addTableStorage(tableStorage);
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
index 03c1f2ffa1c..0cd71d91939 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.storage.pagememory.mv;
 
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static 
org.apache.ignite.internal.metrics.MetricMatchers.hasMeasurementsCount;
+import static org.apache.ignite.internal.metrics.MetricMatchers.hasValue;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
 import static org.apache.ignite.internal.schema.BinaryRowMatcher.isRow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -45,7 +47,8 @@ import 
org.apache.ignite.internal.configuration.SystemLocalConfiguration;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.LongMetric;
+import org.apache.ignite.internal.metrics.TestMetricManager;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageClosedException;
@@ -91,7 +94,7 @@ class PersistentPageMemoryMvPartitionStorageTest extends 
AbstractPageMemoryMvPar
 
         engine = new PersistentPageMemoryStorageEngine(
                 "test",
-                mock(MetricManager.class),
+                new TestMetricManager(),
                 storageConfig,
                 systemConfig,
                 ioRegistry,
@@ -537,4 +540,71 @@ class PersistentPageMemoryMvPartitionStorageTest extends 
AbstractPageMemoryMvPar
             assertThrows(StorageClosedException.class, cursor::next);
         }
     }
+
+    @Test
+    void verifyRunConsistentlyMetrics() {
+        RunConsistentlyMetrics metrics = 
((PersistentPageMemoryMvPartitionStorage) storage).runConsistentlyMetrics();
+
+        // Verify metrics start at zero
+        assertThat(metrics.runConsistentlyDuration(), 
hasMeasurementsCount(0L));
+        assertMetricValue(metrics.runConsistentlyActiveCount(), 0);
+        assertMetricValue(metrics.runConsistentlyStarted(), 0);
+
+        // Execute a simple operation within runConsistently
+        storage.runConsistently(locker -> {
+            // Verify active count is incremented during execution
+            assertMetricValue(metrics.runConsistentlyActiveCount(), 1);
+            assertMetricValue(metrics.runConsistentlyStarted(), 1);
+
+            return null;
+        });
+
+        // Verify duration was recorded
+        assertThat(metrics.runConsistentlyDuration(), 
hasMeasurementsCount(1L));
+
+        // Verify active count is back to zero
+        assertMetricValue(metrics.runConsistentlyActiveCount(), 0);
+        assertMetricValue(metrics.runConsistentlyStarted(), 1);
+
+        // Execute another operation
+        storage.runConsistently(locker -> {
+            assertMetricValue(metrics.runConsistentlyActiveCount(), 1);
+
+            return null;
+        });
+
+        // Verify counters after second invocation
+        assertThat(metrics.runConsistentlyDuration(), 
hasMeasurementsCount(2L));
+        assertMetricValue(metrics.runConsistentlyActiveCount(), 0);
+        assertMetricValue(metrics.runConsistentlyStarted(), 2);
+    }
+
+    @Test
+    void verifyNestedRunConsistentlyDoesNotDoubleCountMetrics() {
+        RunConsistentlyMetrics metrics = 
((PersistentPageMemoryMvPartitionStorage) storage).runConsistentlyMetrics();
+
+        storage.runConsistently(outerLocker -> {
+            assertMetricValue(metrics.runConsistentlyActiveCount(), 1);
+            assertMetricValue(metrics.runConsistentlyStarted(), 1);
+
+            // Nested call - takes fast path, no metrics recorded
+            storage.runConsistently(innerLocker -> {
+                assertMetricValue(metrics.runConsistentlyActiveCount(), 1);
+                assertMetricValue(metrics.runConsistentlyStarted(), 1);
+
+                return null;
+            });
+
+            return null;
+        });
+
+        // Only one entry should be recorded for the outer call
+        assertThat(metrics.runConsistentlyDuration(), 
hasMeasurementsCount(1L));
+        assertMetricValue(metrics.runConsistentlyActiveCount(), 0);
+        assertMetricValue(metrics.runConsistentlyStarted(), 1);
+    }
+
+    private static void assertMetricValue(LongMetric metric, long value) {
+        assertThat(metric, hasValue(is(value)));
+    }
 }


Reply via email to