This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new dd11fa66d21 IGNITE-27688 Add more aipersist region metrics (#7687)
dd11fa66d21 is described below

commit dd11fa66d2163de04996dd57f18a1bdc28d88e83
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Mar 3 17:21:29 2026 +0200

    IGNITE-27688 Add more aipersist region metrics (#7687)
    
    * TotalEmptySize
    * TotalDataSize
    * PagesFillFactor
---
 .../index/IndexAvailabilityControllerTest.java     |   4 +-
 .../ignite/internal/index/IndexBuilderTest.java    |   6 +-
 .../ignite/internal/metrics/TestMetricManager.java |  77 +----
 ...InterruptedRaftSnapshotStorageRecoveryTest.java |  23 +-
 .../metrics/logstorage/LogStorageMetricsTest.java  |  11 +-
 modules/storage-page-memory/build.gradle           |   1 +
 .../PersistentDataRegionMetricsCalculator.java     | 142 ++++++++
 .../pagememory/PersistentPageMemoryDataRegion.java |  97 +++---
 .../mv/PersistentPageMemoryMvPartitionStorage.java |   5 +
 .../PersistentDataRegionMetricsCalculatorTest.java | 369 +++++++++++++++++++++
 .../PersistentPageMemoryDataRegionMetricsTest.java | 246 ++++++++++++++
 .../PersistentPageMemoryMvTableStorageTest.java    | 112 +------
 .../PersistentPageMemoryStorageEngineTest.java     |  84 ++++-
 .../apache/ignite/distributed/ItLockTableTest.java |   4 +-
 ...xDistributedTestSingleNodeNoCleanupMessage.java |   4 +-
 15 files changed, 914 insertions(+), 271 deletions(-)

diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
index 45fdab88ae3..b25a94ecb03 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
@@ -61,7 +61,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
-import org.apache.ignite.internal.metrics.TestMetricManager;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import 
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
 import org.apache.ignite.internal.replicator.ReplicaService;
@@ -104,7 +104,7 @@ public class IndexAvailabilityControllerTest extends 
BaseIgniteAbstractTest {
             new NoOpFailureManager(),
             new CommittedFinalTransactionStateResolver(),
             indexMetaStorage,
-            new TestMetricManager()
+            new NoOpMetricManager()
     );
 
     private final IndexAvailabilityController indexAvailabilityController = 
new IndexAvailabilityController(
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
index 5d1722a7a7b..22a27443c41 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
@@ -51,7 +51,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.metrics.TestMetricManager;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import 
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
@@ -100,15 +100,13 @@ public class IndexBuilderTest extends 
BaseIgniteAbstractTest {
 
     private final PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTime = mock(PendingComparableValuesTracker.class);
 
-    private final TestMetricManager metricManager = new TestMetricManager();
-
     private final IndexBuilder indexBuilder = new IndexBuilder(
             executorService,
             replicaService,
             new NoOpFailureManager(),
             new CommittedFinalTransactionStateResolver(),
             indexMetaStorage,
-            metricManager
+            new NoOpMetricManager()
     );
 
     @BeforeEach
diff --git 
a/modules/metrics/src/testFixtures/java/org/apache/ignite/internal/metrics/TestMetricManager.java
 
b/modules/metrics/src/testFixtures/java/org/apache/ignite/internal/metrics/TestMetricManager.java
index 0f88d8b2a41..034fbd4f054 100644
--- 
a/modules/metrics/src/testFixtures/java/org/apache/ignite/internal/metrics/TestMetricManager.java
+++ 
b/modules/metrics/src/testFixtures/java/org/apache/ignite/internal/metrics/TestMetricManager.java
@@ -19,83 +19,30 @@ package org.apache.ignite.internal.metrics;
 
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
-import java.util.Collection;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.manager.ComponentContext;
-import org.apache.ignite.internal.metrics.exporters.MetricExporter;
-import org.jetbrains.annotations.Nullable;
 
 /** Test implementation without exporters. */
-public class TestMetricManager implements MetricManager {
-    private final MetricRegistry registry = new MetricRegistry();
-
+public class TestMetricManager extends AbstractMetricManager {
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
         return nullCompletedFuture();
     }
 
-    @Override
-    public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
-        return nullCompletedFuture();
-    }
+    /** Returns the metric for the arguments. */
+    public <T extends Metric> T metric(String sourceName, String metricName) {
+        MetricSet metrics = metricSnapshot().metrics().get(sourceName);
 
-    @Override
-    public void registerSource(MetricSource src) {
-        registry.registerSource(src);
-    }
-
-    @Override
-    public void unregisterSource(MetricSource src) {
-        registry.unregisterSource(src);
-    }
-
-    @Override
-    public void unregisterSource(String srcName) {
-        registry.unregisterSource(srcName);
-    }
-
-    @Override
-    public MetricSet enable(MetricSource src) {
-        return registry.enable(src);
-    }
-
-    @Override
-    public MetricSet enable(String srcName) {
-        return registry.enable(srcName);
-    }
-
-    @Override
-    public void disable(MetricSource src) {
-        registry.disable(src);
-    }
-
-    @Override
-    public void disable(String srcName) {
-        registry.disable(srcName);
-    }
-
-    @Override
-    public MetricSnapshot metricSnapshot() {
-        return registry.snapshot();
-    }
-
-    @Override
-    public Collection<MetricSource> metricSources() {
-        return registry.metricSources();
-    }
-
-    @Override
-    public Collection<MetricExporter> enabledExporters() {
-        return List.of();
-    }
+        if (metrics == null) {
+            throw new IllegalArgumentException("Metric source not found: " + 
sourceName);
+        }
 
-    /** Returns the metric for the arguments if it exists. */
-    public @Nullable Metric metric(String sourceName, String metricName) {
-        MetricSnapshot snapshot = metricSnapshot();
+        T metric = metrics.get(metricName);
 
-        MetricSet metrics = snapshot.metrics().get(sourceName);
+        if (metric == null) {
+            throw new IllegalArgumentException("Metric not found: " + 
metricName);
+        }
 
-        return metrics == null ? null : metrics.get(metricName);
+        return metric;
     }
 }
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItInterruptedRaftSnapshotStorageRecoveryTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItInterruptedRaftSnapshotStorageRecoveryTest.java
index 1df7c6fff7b..c6244232ded 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItInterruptedRaftSnapshotStorageRecoveryTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItInterruptedRaftSnapshotStorageRecoveryTest.java
@@ -47,8 +47,7 @@ import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.ComponentContext;
-import org.apache.ignite.internal.metrics.MetricManager;
-import org.apache.ignite.internal.metrics.TestMetricManager;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -56,8 +55,6 @@ import 
org.apache.ignite.internal.storage.configurations.StorageConfiguration;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageEngine;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
-import org.apache.ignite.internal.storage.index.StorageIndexDescriptor;
-import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
 import 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
@@ -70,7 +67,6 @@ import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.table.KeyValueView;
-import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -192,23 +188,14 @@ class ItInterruptedRaftSnapshotStorageRecoveryTest 
extends ClusterPerTestIntegra
         assertTrue(Files.exists(storagePath));
         assertTrue(Files.isDirectory(storagePath));
 
-        var metricManager = new TestMetricManager();
-
-        StorageEngine engine = createPersistentPageMemoryEngine(storagePath, 
metricManager);
+        StorageEngine engine = createPersistentPageMemoryEngine(storagePath);
 
         engine.start();
 
         try {
-            assertThat(metricManager.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
-
             StorageTableDescriptor tableDescriptor = storageTableDescriptor();
 
-            MvTableStorage tableStorage = 
engine.createMvTable(tableDescriptor, new StorageIndexDescriptorSupplier() {
-                @Override
-                public @Nullable StorageIndexDescriptor get(int indexId) {
-                    return null;
-                }
-            });
+            MvTableStorage tableStorage = 
engine.createMvTable(tableDescriptor, indexId -> null);
             assertThat(tableStorage.createMvPartition(0), 
willCompleteSuccessfully());
             assertThat(tableStorage.startRebalancePartition(0), 
willCompleteSuccessfully());
 
@@ -219,14 +206,14 @@ class ItInterruptedRaftSnapshotStorageRecoveryTest 
extends ClusterPerTestIntegra
         }
     }
 
-    private StorageEngine createPersistentPageMemoryEngine(Path storagePath, 
MetricManager metricManager) {
+    private StorageEngine createPersistentPageMemoryEngine(Path storagePath) {
         var ioRegistry = new PageIoRegistry();
 
         ioRegistry.loadFromServiceLoader();
 
         return new PersistentPageMemoryStorageEngine(
                 "test",
-                metricManager,
+                new NoOpMetricManager(),
                 storageConfig,
                 systemConfig,
                 ioRegistry,
diff --git 
a/modules/runner/src/test/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricsTest.java
 
b/modules/runner/src/test/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricsTest.java
index 67bc7cd5896..ae4bf37db4f 100644
--- 
a/modules/runner/src/test/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricsTest.java
+++ 
b/modules/runner/src/test/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricsTest.java
@@ -25,8 +25,6 @@ import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.isA;
-import static org.hamcrest.Matchers.notNullValue;
 
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
@@ -34,7 +32,6 @@ import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metrics.LongGauge;
-import org.apache.ignite.internal.metrics.Metric;
 import org.apache.ignite.internal.metrics.TestMetricManager;
 import org.apache.ignite.internal.raft.storage.LogStorageManager;
 import 
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageManagerCreator;
@@ -148,13 +145,9 @@ class LogStorageMetricsTest {
     }
 
     private void waitForLongGaugeValue(String metricName, Matcher<Long> 
valueMatcher) {
-        Metric metric = metricManager.metric(LogStorageMetricSource.NAME, 
metricName);
-        assertThat(metric, isA(LongGauge.class));
-        LongGauge gauge = (LongGauge) metric;
+        LongGauge metric = metricManager.metric(LogStorageMetricSource.NAME, 
metricName);
 
-        assertThat(gauge, is(notNullValue()));
-
-        await().until(gauge::value, valueMatcher);
+        await().until(metric::value, valueMatcher);
     }
 
     @Test
diff --git a/modules/storage-page-memory/build.gradle 
b/modules/storage-page-memory/build.gradle
index 438f2a0af33..39d873f977a 100644
--- a/modules/storage-page-memory/build.gradle
+++ b/modules/storage-page-memory/build.gradle
@@ -49,6 +49,7 @@ dependencies {
     testImplementation project(':ignite-schema')
     testImplementation testFixtures(project(':ignite-core'))
     testImplementation testFixtures(project(':ignite-configuration'))
+    testImplementation testFixtures(project(':ignite-failure-handler'))
     testImplementation testFixtures(project(':ignite-storage-api'))
     testImplementation testFixtures(project(':ignite-schema'))
     testImplementation testFixtures(project(':ignite-page-memory'))
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentDataRegionMetricsCalculator.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentDataRegionMetricsCalculator.java
new file mode 100644
index 00000000000..e88ef1945d9
--- /dev/null
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentDataRegionMetricsCalculator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import java.util.Collection;
+import java.util.stream.Stream;
+import 
org.apache.ignite.internal.storage.pagememory.mv.PersistentPageMemoryMvPartitionStorage;
+
+/**
+ * Class containing methods to calculate various metrics related to an 
"aipersist" data region and tables in it.
+ */
+class PersistentDataRegionMetricsCalculator {
+    private final int pageSize;
+
+    PersistentDataRegionMetricsCalculator(int pageSize) {
+        this.pageSize = pageSize;
+    }
+
+    long totalAllocatedSize(Collection<PersistentPageMemoryTableStorage> 
tables) {
+        long pagesCount = 
tables.stream().mapToLong(PersistentDataRegionMetricsCalculator::tableAllocatedPagesCount).sum();
+
+        return pagesCount * pageSize;
+    }
+
+    long totalAllocatedSize(PersistentPageMemoryTableStorage table) {
+        return tableAllocatedPagesCount(table) * pageSize;
+    }
+
+    private static long 
tableAllocatedPagesCount(PersistentPageMemoryTableStorage table) {
+        return 
allPartitions(table).mapToLong(PersistentPageMemoryMvPartitionStorage::pageCount).sum();
+    }
+
+    long totalUsedSize(Collection<PersistentPageMemoryTableStorage> tables) {
+        long pagesCount = 
tables.stream().mapToLong(PersistentDataRegionMetricsCalculator::tableNonEmptyAllocatedPagesCount).sum();
+
+        return pagesCount * pageSize;
+    }
+
+    long totalUsedSize(PersistentPageMemoryTableStorage table) {
+        return tableNonEmptyAllocatedPagesCount(table) * pageSize;
+    }
+
+    private static long 
tableNonEmptyAllocatedPagesCount(PersistentPageMemoryTableStorage table) {
+        return allPartitions(table)
+                .mapToLong(partitionStorage -> partitionStorage.pageCount() - 
partitionStorage.emptyDataPageCountInFreeList())
+                .sum();
+    }
+
+    long totalEmptySize(Collection<PersistentPageMemoryTableStorage> tables) {
+        long pageCount = 
tables.stream().mapToLong(PersistentDataRegionMetricsCalculator::tableEmptyPagesCount).sum();
+
+        return pageCount * pageSize;
+    }
+
+    long totalEmptySize(PersistentPageMemoryTableStorage table) {
+        return tableEmptyPagesCount(table) * pageSize;
+    }
+
+    private static long tableEmptyPagesCount(PersistentPageMemoryTableStorage 
table) {
+        return 
allPartitions(table).mapToLong(PersistentPageMemoryMvPartitionStorage::emptyDataPageCountInFreeList).sum();
+    }
+
+    long totalDataSize(Collection<PersistentPageMemoryTableStorage> tables) {
+        return tables.stream().mapToLong(this::totalDataSize).sum();
+    }
+
+    long totalDataSize(PersistentPageMemoryTableStorage table) {
+        return 
allPartitions(table).mapToLong(this::partitionFilledSpaceBytes).sum();
+    }
+
+    private long 
partitionFilledSpaceBytes(PersistentPageMemoryMvPartitionStorage partition) {
+        int pagesCount = partition.pageCount();
+
+        int emptyPagesCount = partition.emptyDataPageCountInFreeList();
+
+        long pagesWithData = (long) pagesCount - emptyPagesCount;
+
+        return pagesWithData * pageSize - partition.freeSpaceInFreeList();
+    }
+
+    /**
+     * Returns the ratio of space occupied by user and system data to the size 
of all pages that contain this data.
+     *
+     * <p>This metric can help to determine how much space of a data page is 
occupied on average. Low fill factor can
+     * indicate that data pages are very fragmented (i.e. there is a lot of 
empty space across all data pages).
+     */
+    double pagesFillFactor(Collection<PersistentPageMemoryTableStorage> 
tables) {
+        // Number of bytes used by pages that contain at least some data.
+        long totalUsedSpaceBytes = totalUsedSize(tables);
+
+        if (totalUsedSpaceBytes == 0) {
+            return 0;
+        }
+
+        // Amount of free space in these pages.
+        long freeSpaceBytes = tables.stream()
+                .flatMap(PersistentDataRegionMetricsCalculator::allPartitions)
+                
.mapToLong(PersistentPageMemoryMvPartitionStorage::freeSpaceInFreeList)
+                .sum();
+
+        // Number of bytes that contain useful data.
+        long nonEmptySpaceBytes = totalUsedSpaceBytes - freeSpaceBytes;
+
+        return (double) nonEmptySpaceBytes / totalUsedSpaceBytes;
+    }
+
+    /** Same as {@link #pagesFillFactor(Collection)} but for a single table. */
+    double pagesFillFactor(PersistentPageMemoryTableStorage table) {
+        long totalUsedSpaceBytes = totalUsedSize(table);
+
+        if (totalUsedSpaceBytes == 0) {
+            return 0;
+        }
+
+        long freeSpaceBytes = allPartitions(table)
+                
.mapToLong(PersistentPageMemoryMvPartitionStorage::freeSpaceInFreeList)
+                .sum();
+
+        long nonEmptySpaceBytes = totalUsedSpaceBytes - freeSpaceBytes;
+
+        return (double) nonEmptySpaceBytes / totalUsedSpaceBytes;
+    }
+
+    private static Stream<PersistentPageMemoryMvPartitionStorage> 
allPartitions(PersistentPageMemoryTableStorage tableStorage) {
+        return tableStorage.mvPartitionStorages.getAll().stream();
+    }
+}
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
index 2e496eb9004..acb2dd982c2 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java
@@ -40,6 +40,7 @@ import 
org.apache.ignite.internal.configuration.SystemPropertyView;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.DoubleGauge;
 import org.apache.ignite.internal.metrics.LongGauge;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.pagememory.DataRegion;
@@ -47,14 +48,12 @@ import org.apache.ignite.internal.pagememory.FullPageId;
 import 
org.apache.ignite.internal.pagememory.configuration.PersistentDataRegionConfiguration;
 import org.apache.ignite.internal.pagememory.configuration.ReplacementMode;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
-import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import org.apache.ignite.internal.pagememory.persistence.PageWriteTarget;
 import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import 
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemoryMetricSource;
 import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager;
 import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
-import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
 import 
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
 import 
org.apache.ignite.internal.pagememory.persistence.throttling.PagesWriteSpeedBasedThrottle;
 import 
org.apache.ignite.internal.pagememory.persistence.throttling.PagesWriteThrottlePolicy;
@@ -67,7 +66,6 @@ import 
org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
 import 
org.apache.ignite.internal.storage.metrics.StorageEngineTablesMetricSource;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryProfileConfiguration;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryProfileView;
-import 
org.apache.ignite.internal.storage.pagememory.mv.PersistentPageMemoryMvPartitionStorage;
 import org.apache.ignite.internal.util.OffheapReadWriteLock;
 import org.jetbrains.annotations.VisibleForTesting;
 
@@ -117,6 +115,8 @@ public class PersistentPageMemoryDataRegion implements 
DataRegion<PersistentPage
 
     private final PersistentPageMemoryMetricSource metricSource;
 
+    private final PersistentDataRegionMetricsCalculator metricsCalculator;
+
     private final ConcurrentMap<Integer, PersistentPageMemoryTableStorage> 
tableStorages = new ConcurrentHashMap<>();
 
     /**
@@ -152,6 +152,7 @@ public class PersistentPageMemoryDataRegion implements 
DataRegion<PersistentPage
         this.checkpointManager = checkpointManager;
 
         metricSource = new PersistentPageMemoryMetricSource("storage." + 
ENGINE_NAME + "." + cfg.value().name());
+        metricsCalculator = new 
PersistentDataRegionMetricsCalculator(pageSize);
     }
 
     /**
@@ -458,13 +459,32 @@ public class PersistentPageMemoryDataRegion implements 
DataRegion<PersistentPage
     private void initMetrics() {
         metricSource.addMetric(new LongGauge(
                 "TotalAllocatedSize",
-                "Total size of allocated pages on disk in bytes.",
-                this::totalAllocatedPagesSizeOnDiskInBytes
+                String.format("Total size of all pages allocated by \"%s\" 
storage engine, in bytes.", ENGINE_NAME),
+                () -> 
metricsCalculator.totalAllocatedSize(tableStorages.values())
         ));
+
         metricSource.addMetric(new LongGauge(
                 "TotalUsedSize",
-                "Total size of non-empty allocated pages on disk in bytes.",
-                this::totalNonEmptyAllocatedPagesSizeOnDiskInBytes
+                String.format("Total size of all non-empty pages allocated by 
\"%s\" storage engine, in bytes.", ENGINE_NAME),
+                () -> metricsCalculator.totalUsedSize(tableStorages.values())
+        ));
+
+        metricSource.addMetric(new LongGauge(
+                "TotalEmptySize",
+                String.format("Total size of all empty pages allocated by 
\"%s\" storage engine, in bytes.", ENGINE_NAME),
+                () -> metricsCalculator.totalEmptySize(tableStorages.values())
+        ));
+
+        metricSource.addMetric(new LongGauge(
+                "TotalDataSize",
+                String.format("Total space occupied by data contained in pages 
allocated by \"%s\" storage engine, in bytes.", ENGINE_NAME),
+                () -> metricsCalculator.totalDataSize(tableStorages.values())
+        ));
+
+        metricSource.addMetric(new DoubleGauge(
+                "PagesFillFactor",
+                "Ratio of number of bytes occupied by data to the total number 
of bytes occupied by pages that contain this data.",
+                () -> metricsCalculator.pagesFillFactor(tableStorages.values())
         ));
     }
 
@@ -481,47 +501,38 @@ public class PersistentPageMemoryDataRegion implements 
DataRegion<PersistentPage
 
         metricSource.addMetric(new LongGauge(
                 "TotalAllocatedSize",
-                "Total size of all pages allocated by '" + ENGINE_NAME + "' 
storage engine for a given table, in bytes.",
-                () -> {
-                    long totalPages = tableStorage.mvPartitionStorages.stream()
-                            
.mapToLong(PersistentPageMemoryMvPartitionStorage::pageCount)
-                            .sum();
-
-                    return pageSize * totalPages;
-                }
+                String.format("Total size of all pages allocated by \"%s\" 
storage engine for a given table, in bytes.", ENGINE_NAME),
+                () -> metricsCalculator.totalAllocatedSize(tableStorage)
         ));
-    }
-
-    private long totalAllocatedPagesSizeOnDiskInBytes() {
-        long pageCount = 0;
-
-        for (PersistentPageMemoryTableStorage tableStorage : 
tableStorages.values()) {
-            for (PersistentPageMemoryMvPartitionStorage partitionStorage : 
tableStorage.mvPartitionStorages.getAll()) {
-                pageCount += 
allocatedPageCountOnDisk(tableStorage.getTableId(), 
partitionStorage.partitionId());
-            }
-        }
 
-        return pageCount * pageSize;
-    }
-
-    private long totalNonEmptyAllocatedPagesSizeOnDiskInBytes() {
-        long pageCount = 0;
-
-        for (PersistentPageMemoryTableStorage tableStorage : 
tableStorages.values()) {
-            for (PersistentPageMemoryMvPartitionStorage partitionStorage : 
tableStorage.mvPartitionStorages.getAll()) {
-                pageCount += 
allocatedPageCountOnDisk(tableStorage.getTableId(), 
partitionStorage.partitionId());
-
-                pageCount -= partitionStorage.emptyDataPageCountInFreeList();
-            }
-        }
+        metricSource.addMetric(new LongGauge(
+                "TotalUsedSize",
+                String.format("Total size of all non-empty pages allocated by 
\"%s\" storage engine for a given table, in bytes.",
+                        ENGINE_NAME),
+                () -> metricsCalculator.totalUsedSize(tableStorage)
+        ));
 
-        return pageCount * pageSize;
-    }
+        metricSource.addMetric(new LongGauge(
+                "TotalEmptySize",
+                String.format("Total size of all empty pages allocated by 
\"%s\" storage engine for a given table, in bytes.", ENGINE_NAME),
+                () -> metricsCalculator.totalEmptySize(tableStorage)
+        ));
 
-    private long allocatedPageCountOnDisk(int tableId, int partitionId) {
-        FilePageStore store = filePageStoreManager.getStore(new 
GroupPartitionId(tableId, partitionId));
+        metricSource.addMetric(new LongGauge(
+                "TotalDataSize",
+                String.format(
+                        "Total space occupied by data contained in pages 
allocated by \"%s\" storage engine for a given table, in bytes.",
+                        ENGINE_NAME
+                ),
+                () -> metricsCalculator.totalDataSize(tableStorage)
+        ));
 
-        return store == null ? 0 : store.pages();
+        metricSource.addMetric(new DoubleGauge(
+                "PagesFillFactor",
+                "Ratio of number of bytes occupied by data in a given table to 
the total number of bytes occupied by pages that "
+                        + "contain this data.",
+                () -> metricsCalculator.pagesFillFactor(tableStorage)
+        ));
     }
 
     @Override
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index 8b10f12744b..8619e537d4b 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -679,6 +679,11 @@ public class PersistentPageMemoryMvPartitionStorage 
extends AbstractPageMemoryMv
         return renewableState.freeList().emptyDataPages();
     }
 
+    /** Returns the amount of free space (in bytes) in the partially filled 
pages. */
+    public long freeSpaceInFreeList() {
+        return renewableState.freeList().freeSpace();
+    }
+
     @Override
     public Cursor<RowId> scanWriteIntents() {
         return busy(() -> new WriteIntentsCursor(lockWriteIntentListHead()));
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentDataRegionMetricsCalculatorTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentDataRegionMetricsCalculatorTest.java
new file mode 100644
index 00000000000..ef1d1e89ba5
--- /dev/null
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentDataRegionMetricsCalculatorTest.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
+import 
org.apache.ignite.internal.storage.pagememory.mv.PersistentPageMemoryMvPartitionStorage;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+class PersistentDataRegionMetricsCalculatorTest extends BaseIgniteAbstractTest 
{
+    private static final int PAGE_SIZE = 4096;
+    private static final int TABLE_ID = 1;
+    private static final int MAX_PARTITIONS = 10;
+
+    private final PersistentDataRegionMetricsCalculator calculator = new 
PersistentDataRegionMetricsCalculator(PAGE_SIZE);
+
+    @Test
+    void testTotalAllocatedSizeEmptyTable() {
+        PersistentPageMemoryTableStorage table = createTable();
+
+        assertThat(calculator.totalAllocatedSize(table), is(0L));
+    }
+
+    @Test
+    void testTotalAllocatedSizeSinglePartition() {
+        PersistentPageMemoryTableStorage table = createTable();
+        addPartition(table, 0, 10, 3, 500L);
+
+        assertThat(calculator.totalAllocatedSize(table), is(10L * PAGE_SIZE));
+    }
+
+    @Test
+    void testTotalAllocatedSizeMultiplePartitions() {
+        PersistentPageMemoryTableStorage table = createTable();
+        addPartition(table, 0, 5, 1, 0L);
+        addPartition(table, 1, 8, 3, 0L);
+
+        assertThat(calculator.totalAllocatedSize(table), is(13L * PAGE_SIZE));
+    }
+
+    @Test
+    void testTotalAllocatedSizeMultipleTables() {
+        PersistentPageMemoryTableStorage table1 = createTable(1);
+        addPartition(table1, 0, 5, 0, 0L);
+
+        PersistentPageMemoryTableStorage table2 = createTable(2);
+        addPartition(table2, 0, 3, 1, 0L);
+
+        assertThat(calculator.totalAllocatedSize(List.of(table1, table2)), 
is(8L * PAGE_SIZE));
+    }
+
+    @Test
+    void testTotalUsedSizeEmptyTable() {
+        PersistentPageMemoryTableStorage table = createTable();
+
+        assertThat(calculator.totalUsedSize(table), is(0L));
+    }
+
+    @Test
+    void testTotalUsedSizeNoEmptyPages() {
+        PersistentPageMemoryTableStorage table = createTable();
+        addPartition(table, 0, 10, 0, 0L);
+
+        assertThat(calculator.totalUsedSize(table), is(10L * PAGE_SIZE));
+    }
+
+    @Test
+    void testTotalUsedSizeWithEmptyPages() {
+        PersistentPageMemoryTableStorage table = createTable();
+        addPartition(table, 0, 10, 3, 500L);
+
+        // Used pages = total - empty = 10 - 3 = 7
+        assertThat(calculator.totalUsedSize(table), is(7L * PAGE_SIZE));
+    }
+
+    @Test
+    void testTotalUsedSizeMultiplePartitions() {
+        PersistentPageMemoryTableStorage table = createTable();
+        addPartition(table, 0, 5, 1, 0L);
+        addPartition(table, 1, 8, 3, 0L);
+
+        // Used pages: (5-1) + (8-3) = 4 + 5 = 9
+        assertThat(calculator.totalUsedSize(table), is(9L * PAGE_SIZE));
+    }
+
+    @Test
+    void testTotalUsedSizeMultipleTables() {
+        PersistentPageMemoryTableStorage table1 = createTable(1);
+        addPartition(table1, 0, 5, 0, 0L);
+
+        PersistentPageMemoryTableStorage table2 = createTable(2);
+        addPartition(table2, 0, 3, 1, 0L);
+
+        // Used pages: 5 + (3-1) = 5 + 2 = 7
+        assertThat(calculator.totalUsedSize(List.of(table1, table2)), is(7L * 
PAGE_SIZE));
+    }
+
+    @Test
+    void testTotalEmptySizeEmptyTable() {
+        PersistentPageMemoryTableStorage table = createTable();
+
+        assertThat(calculator.totalEmptySize(table), is(0L));
+    }
+
+    @Test
+    void testTotalEmptySizeNoEmptyPages() {
+        PersistentPageMemoryTableStorage table = createTable();
+        addPartition(table, 0, 10, 0, 0L);
+
+        assertThat(calculator.totalEmptySize(table), is(0L));
+    }
+
+    @Test
+    void testTotalEmptySizeWithEmptyPages() {
+        PersistentPageMemoryTableStorage table = createTable();
+        addPartition(table, 0, 10, 3, 500L);
+
+        assertThat(calculator.totalEmptySize(table), is(3L * PAGE_SIZE));
+    }
+
+    @Test
+    void testTotalEmptySizeMultiplePartitions() {
+        PersistentPageMemoryTableStorage table = createTable();
+        addPartition(table, 0, 5, 1, 0L);
+        addPartition(table, 1, 8, 3, 0L);
+
+        assertThat(calculator.totalEmptySize(table), is(4L * PAGE_SIZE));
+    }
+
+    @Test
+    void testTotalEmptySizeMultipleTables() {
+        PersistentPageMemoryTableStorage table1 = createTable(1);
+        addPartition(table1, 0, 5, 2, 0L);
+
+        PersistentPageMemoryTableStorage table2 = createTable(2);
+        addPartition(table2, 0, 3, 1, 0L);
+
+        assertThat(calculator.totalEmptySize(List.of(table1, table2)), is(3L * 
PAGE_SIZE));
+    }
+
+    @Test
+    void testAllocatedEqualsUsedPlusEmpty() {
+        PersistentPageMemoryTableStorage table = createTable();
+        addPartition(table, 0, 10, 3, 500L);
+        addPartition(table, 1, 7, 2, 200L);
+
+        long allocated = calculator.totalAllocatedSize(table);
+        long used = calculator.totalUsedSize(table);
+        long empty = calculator.totalEmptySize(table);
+
+        assertThat(allocated, is(used + empty));
+    }
+
+    @Test
+    void testTotalDataSizeEmptyTable() {
+        PersistentPageMemoryTableStorage table = createTable();
+
+        assertThat(calculator.totalDataSize(table), is(0L));
+    }
+
+    @Test
+    void testTotalDataSizeFullPagesNoFreeSpace() {
+        PersistentPageMemoryTableStorage table = createTable();
+        addPartition(table, 0, 10, 0, 0L);
+
+        // Data occupies all used page space
+        assertThat(calculator.totalDataSize(table), is(10L * PAGE_SIZE));
+    }
+
+    @Test
+    void testTotalDataSizeWithEmptyPagesNoFreeSpace() {
+        PersistentPageMemoryTableStorage table = createTable();
+        addPartition(table, 0, 10, 3, 0L);
+
+        // Only non-empty pages count; no fragmentation within those pages
+        assertThat(calculator.totalDataSize(table), is(7L * PAGE_SIZE));
+    }
+
+    @Test
+    void testTotalDataSizeWithFreeSpaceInUsedPages() {
+        PersistentPageMemoryTableStorage table = createTable();
+        addPartition(table, 0, 10, 2, 1500L);
+
+        // Used pages: 10 - 2 = 8. Data = 8 * pageSize - 1500
+        assertThat(calculator.totalDataSize(table), is(8L * PAGE_SIZE - 
1500L));
+    }
+
+    @Test
+    void testTotalDataSizeMultiplePartitions() {
+        PersistentPageMemoryTableStorage table = createTable();
+        addPartition(table, 0, 5, 1, 500L);
+        addPartition(table, 1, 8, 3, 1000L);
+
+        // Partition 0: (5-1)*pageSize - 500 = 4*pageSize - 500
+        // Partition 1: (8-3)*pageSize - 1000 = 5*pageSize - 1000
+        long expected = 4L * PAGE_SIZE - 500L + 5L * PAGE_SIZE - 1000L;
+        assertThat(calculator.totalDataSize(table), is(expected));
+    }
+
+    @Test
+    void testTotalDataSizeMultipleTables() {
+        PersistentPageMemoryTableStorage table1 = createTable(1);
+        addPartition(table1, 0, 5, 0, 0L);
+
+        PersistentPageMemoryTableStorage table2 = createTable(2);
+        addPartition(table2, 0, 3, 1, 200L);
+
+        // Table1: 5*pageSize - 0 = 5*pageSize
+        // Table2: (3-1)*pageSize - 200 = 2*pageSize - 200
+        long expected = 5L * PAGE_SIZE + 2L * PAGE_SIZE - 200L;
+        assertThat(calculator.totalDataSize(List.of(table1, table2)), 
is(expected));
+    }
+
+    @Test
+    void testPagesFillFactorEmptyTableReturnsZero() {
+        PersistentPageMemoryTableStorage table = createTable();
+
+        assertThat(calculator.pagesFillFactor(table), is(0.0));
+        assertThat(calculator.pagesFillFactor(List.of(table)), is(0.0));
+    }
+
+    @Test
+    void testPagesFillFactorAllPagesEmptyReturnsZero() {
+        PersistentPageMemoryTableStorage table = createTable();
+        // All pages are in the empty list, no used pages
+        addPartition(table, 0, 5, 5, 0L);
+
+        assertThat(calculator.pagesFillFactor(table), is(0.0));
+    }
+
+    @Test
+    void testPagesFillFactorUsedPagesNoFreeSpaceReturnsOne() {
+        PersistentPageMemoryTableStorage table = createTable();
+        addPartition(table, 0, 10, 0, 0L);
+
+        assertThat(calculator.pagesFillFactor(table), is(1.0));
+        assertThat(calculator.pagesFillFactor(List.of(table)), is(1.0));
+    }
+
+    @Test
+    void testPagesFillFactorUsedPagesWithFreeSpace() {
+        PersistentPageMemoryTableStorage table = createTable();
+        // 8 used pages, 2048 bytes free space
+        addPartition(table, 0, 8, 0, 2048L);
+
+        long usedBytes = 8L * PAGE_SIZE;
+        double expected = (double) (usedBytes - 2048L) / usedBytes;
+        assertThat(calculator.pagesFillFactor(table), closeTo(expected, 
1.0e-10));
+    }
+
+    @Test
+    void testPagesFillFactorWithEmptyAndUsedPages() {
+        PersistentPageMemoryTableStorage table = createTable();
+        // 10 pages: 3 empty, 7 used with 1000 bytes free
+        addPartition(table, 0, 10, 3, 1000L);
+
+        long usedBytes = 7L * PAGE_SIZE;
+        double expected = (double) (usedBytes - 1000L) / usedBytes;
+        assertThat(calculator.pagesFillFactor(table), closeTo(expected, 
1.0e-10));
+        assertThat(calculator.pagesFillFactor(List.of(table)), 
closeTo(expected, 1.0e-10));
+    }
+
+    @Test
+    void testPagesFillFactorMultiplePartitions() {
+        PersistentPageMemoryTableStorage table = createTable();
+        addPartition(table, 0, 5, 1, 500L);
+        addPartition(table, 1, 8, 3, 1000L);
+
+        // Total used pages: (5-1) + (8-3) = 4 + 5 = 9
+        // Total used bytes: 9 * pageSize
+        // Total free space: 500 + 1000 = 1500
+        long usedBytes = 9L * PAGE_SIZE;
+        long freeBytes = 1500L;
+        double expected = (double) (usedBytes - freeBytes) / usedBytes;
+        assertThat(calculator.pagesFillFactor(table), closeTo(expected, 
1.0e-10));
+    }
+
+    @Test
+    void testPagesFillFactorMultipleTables() {
+        PersistentPageMemoryTableStorage table1 = createTable(1);
+        addPartition(table1, 0, 5, 0, 500L);
+
+        PersistentPageMemoryTableStorage table2 = createTable(2);
+        addPartition(table2, 0, 3, 1, 200L);
+
+        // Used pages: 5 + (3-1) = 7; used bytes: 7 * pageSize; free: 500 + 
200 = 700
+        long usedBytes = 7L * PAGE_SIZE;
+        long freeBytes = 700L;
+        double expected = (double) (usedBytes - freeBytes) / usedBytes;
+        assertThat(calculator.pagesFillFactor(List.of(table1, table2)), 
closeTo(expected, 1.0e-10));
+    }
+
+    @Test
+    void testSingleTableAndCollectionOfOneGiveSameResults() {
+        PersistentPageMemoryTableStorage table = createTable();
+        addPartition(table, 0, 10, 3, 1500L);
+        addPartition(table, 1, 7, 2, 800L);
+
+        assertThat(calculator.totalAllocatedSize(table), 
is(calculator.totalAllocatedSize(List.of(table))));
+        assertThat(calculator.totalUsedSize(table), 
is(calculator.totalUsedSize(List.of(table))));
+        assertThat(calculator.totalEmptySize(table), 
is(calculator.totalEmptySize(List.of(table))));
+        assertThat(calculator.totalDataSize(table), 
is(calculator.totalDataSize(List.of(table))));
+        assertThat(calculator.pagesFillFactor(table), 
is(calculator.pagesFillFactor(List.of(table))));
+    }
+
+    @Test
+    void testEmptyCollectionReturnsZeroForAllMetrics() {
+        assertThat(calculator.totalAllocatedSize(List.of()), is(0L));
+        assertThat(calculator.totalUsedSize(List.of()), is(0L));
+        assertThat(calculator.totalEmptySize(List.of()), is(0L));
+        assertThat(calculator.totalDataSize(List.of()), is(0L));
+        assertThat(calculator.pagesFillFactor(List.of()), is(0.0));
+    }
+
+    private static PersistentPageMemoryTableStorage createTable() {
+        return createTable(TABLE_ID);
+    }
+
+    @SuppressWarnings("DataFlowIssue")
+    private static PersistentPageMemoryTableStorage createTable(int tableId) {
+        return new PersistentPageMemoryTableStorage(
+                new StorageTableDescriptor(tableId, MAX_PARTITIONS, "default"),
+                id -> null,
+                null,
+                null,
+                null,
+                null,
+                null
+        );
+    }
+
+    private static void addPartition(
+            PersistentPageMemoryTableStorage table,
+            int partitionId,
+            int pageCount,
+            int emptyPages,
+            long freeSpace
+    ) {
+        PersistentPageMemoryMvPartitionStorage partition = 
mock(PersistentPageMemoryMvPartitionStorage.class);
+        when(partition.pageCount()).thenReturn(pageCount);
+        when(partition.emptyDataPageCountInFreeList()).thenReturn(emptyPages);
+        when(partition.freeSpaceInFreeList()).thenReturn(freeSpace);
+
+        assertThat(table.mvPartitionStorages.create(partitionId, id -> 
partition), willCompleteSuccessfully());
+    }
+}
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegionMetricsTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegionMetricsTest.java
new file mode 100644
index 00000000000..ac4d771dc44
--- /dev/null
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegionMetricsTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine.ENGINE_NAME;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
+import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import org.apache.ignite.internal.components.NoOpLogSyncer;
+import org.apache.ignite.internal.configuration.SystemLocalConfiguration;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.DoubleMetric;
+import org.apache.ignite.internal.metrics.LongMetric;
+import org.apache.ignite.internal.metrics.TestMetricManager;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.BaseMvStoragesTest;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
+import 
org.apache.ignite.internal.storage.configurations.StorageProfileConfiguration;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
+import org.apache.ignite.internal.storage.gc.GcEntry;
+import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryProfileConfiguration;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.Constants;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ExecutorServiceExtension.class)
+@ExtendWith(WorkDirectoryExtension.class)
+class PersistentPageMemoryDataRegionMetricsTest extends BaseMvStoragesTest {
+    private static final int TABLE_ID = 123;
+    private static final int PARTITION_ID = 12;
+
+    @InjectConfiguration("mock.profiles.default {engine = aipersist, sizeBytes 
= " + 256 * Constants.MiB + "}")
+    private StorageConfiguration storageConfig;
+
+    private PersistentPageMemoryStorageEngine engine;
+
+    private final TestMetricManager metricManager = new TestMetricManager();
+
+    @BeforeEach
+    void setUp(
+            @InjectExecutorService ExecutorService executorService,
+            @InjectConfiguration SystemLocalConfiguration systemConfig,
+            @WorkDirectory Path workDir
+    ) {
+        var ioRegistry = new PageIoRegistry();
+
+        ioRegistry.loadFromServiceLoader();
+
+        engine = new PersistentPageMemoryStorageEngine(
+                "test",
+                metricManager,
+                storageConfig,
+                systemConfig,
+                ioRegistry,
+                workDir,
+                null,
+                new NoOpFailureManager(),
+                new NoOpLogSyncer(),
+                executorService,
+                clock
+        );
+
+        assertThat(metricManager.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
+        engine.start();
+    }
+
+    @AfterEach
+    protected void tearDown() throws Exception {
+        closeAllManually(
+                engine == null ? null : engine::stop,
+                () -> assertThat(stopAsync(new ComponentContext(), 
metricManager), willCompleteSuccessfully())
+        );
+    }
+
+    private MvTableStorage createMvTableStorage() {
+        return engine.createMvTable(
+                new StorageTableDescriptor(TABLE_ID, DEFAULT_PARTITION_COUNT, 
DEFAULT_STORAGE_PROFILE),
+                indexId -> null
+        );
+    }
+
+    @Test
+    void testMaxSizeMetric() {
+        LongMetric metric = 
metricManager.metric(defaultProfileMetricSourceName(), "MaxSize");
+
+        assertThat(metric.value(), 
is(defaultProfileConfig().sizeBytes().value()));
+    }
+
+    @Test
+    void testMaxSizeMetricAfterChangeConfig() {
+        PersistentPageMemoryProfileConfiguration defaultProfileConfig = 
defaultProfileConfig();
+
+        long sizeBytesBeforeChange = defaultProfileConfig.sizeBytes().value();
+
+        assertThat(defaultProfileConfig.sizeBytes().update(2 * 
sizeBytesBeforeChange), willCompleteSuccessfully());
+
+        LongMetric metric = 
metricManager.metric(defaultProfileMetricSourceName(), "MaxSize");
+
+        assertThat(metric.value(), is(sizeBytesBeforeChange));
+    }
+
+    @Test
+    void testRegionMetrics() {
+        LongMetric totalAllocatedSize = 
metricManager.metric(defaultProfileMetricSourceName(), "TotalAllocatedSize");
+        LongMetric totalUsedSize = 
metricManager.metric(defaultProfileMetricSourceName(), "TotalUsedSize");
+        LongMetric totalEmptySize = 
metricManager.metric(defaultProfileMetricSourceName(), "TotalEmptySize");
+        LongMetric totalDataSize = 
metricManager.metric(defaultProfileMetricSourceName(), "TotalDataSize");
+        DoubleMetric pagesFillFactor = 
metricManager.metric(defaultProfileMetricSourceName(), "PagesFillFactor");
+
+        assertThat(totalAllocatedSize.value(), is(0L));
+        assertThat(totalUsedSize.value(), is(0L));
+        assertThat(totalEmptySize.value(), is(0L));
+        assertThat(totalDataSize.value(), is(0L));
+        assertThat(pagesFillFactor.value(), is(0.0));
+
+        MvTableStorage tableStorage = createMvTableStorage();
+
+        MvPartitionStorage partitionStorage = 
getOrCreateMvPartition(tableStorage, PARTITION_ID);
+
+        // After partition creation, only system pages exist (meta, free list 
header, etc.).
+        // No data pages are in the free list yet, so freeSpace = 0 and 
emptyDataPages = 0.
+        long allocatedSizeEmptyPartition = totalAllocatedSize.value();
+
+        assertThat(allocatedSizeEmptyPartition, greaterThan(0L));
+
+        assertThat(totalUsedSize.value(), is(allocatedSizeEmptyPartition));
+        assertThat(totalEmptySize.value(), is(0L));
+        assertThat(totalDataSize.value(), is(allocatedSizeEmptyPartition));
+        assertThat(pagesFillFactor.value(), is(1.0));
+
+        // Write a large row that forces fragmentation (payload exceeds single 
data page capacity).
+        // After GC fully filled pages will be added to the reuse bucket, so 
we can guarantee that totalEmptySize > 0 after vacuum.
+        String largeStr = "a".repeat((int) pageSize());
+
+        var rowId = new RowId(PARTITION_ID);
+
+        BinaryRow row = binaryRow(new TestKey(0, largeStr), new TestValue(1, 
largeStr));
+
+        addWriteCommitted(partitionStorage, rowId, row);
+
+        assertThat(totalAllocatedSize.value(), is(totalUsedSize.value() + 
totalEmptySize.value()));
+        assertThat(totalUsedSize.value(), 
greaterThan(allocatedSizeEmptyPartition));
+        assertThat(totalEmptySize.value(), is(0L));
+        assertThat(totalDataSize.value(), allOf(greaterThan(0L), 
lessThan(totalUsedSize.value())));
+        assertThat(pagesFillFactor.value(), allOf(greaterThan(0.0), 
lessThan(1.0)));
+
+        // Write a tombstone for the same row to create a GC entry, then 
vacuum it.
+        addWriteCommitted(partitionStorage, rowId, null);
+
+        vacuum(partitionStorage);
+
+        assertThat(totalAllocatedSize.value(), equalTo(totalUsedSize.value() + 
totalEmptySize.value()));
+        assertThat(totalUsedSize.value(), greaterThan(0L));
+        assertThat(totalEmptySize.value(), greaterThan(0L));
+        assertThat(totalDataSize.value(), allOf(greaterThan(0L), 
lessThan(totalUsedSize.value())));
+        assertThat(pagesFillFactor.value(), allOf(greaterThan(0.0), 
lessThan(1.0)));
+    }
+
+    private PersistentPageMemoryProfileConfiguration defaultProfileConfig() {
+        StorageProfileConfiguration config = 
storageConfig.profiles().get(DEFAULT_STORAGE_PROFILE);
+
+        assertNotNull(config);
+        assertInstanceOf(PersistentPageMemoryProfileConfiguration.class, 
config);
+
+        return (PersistentPageMemoryProfileConfiguration) config;
+    }
+
+    private String defaultProfileMetricSourceName() {
+        return "storage." + ENGINE_NAME + "." + 
defaultProfileConfig().name().value();
+    }
+
+    private long pageSize() {
+        return engine.configuration().pageSizeBytes().value();
+    }
+
+    private void addWriteCommitted(MvPartitionStorage storage, RowId rowId, 
@Nullable BinaryRow row) {
+        storage.runConsistently(locker -> {
+            locker.lock(rowId);
+
+            storage.addWriteCommitted(rowId, row, clock.now());
+
+            return null;
+        });
+    }
+
+    private static void vacuum(MvPartitionStorage storage) {
+        storage.runConsistently(locker -> {
+            List<GcEntry> entries = storage.peek(HybridTimestamp.MAX_VALUE, 1);
+
+            assertThat(entries, hasSize(1));
+
+            GcEntry gcEntry = entries.get(0);
+
+            locker.lock(gcEntry.getRowId());
+
+            storage.vacuum(gcEntry);
+
+            return null;
+        });
+    }
+}
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
index 50bfc108a9d..32016a448f1 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
@@ -22,21 +22,16 @@ import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
 import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGES_SORTED;
-import static 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine.ENGINE_NAME;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.emptyArray;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
@@ -56,26 +51,20 @@ import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.RunnableX;
-import org.apache.ignite.internal.manager.ComponentContext;
-import org.apache.ignite.internal.metrics.LongMetric;
-import org.apache.ignite.internal.metrics.TestMetricManager;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
-import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
 import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState;
 import 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTimeoutLock;
-import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.AbstractMvTableStorageTest;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
-import 
org.apache.ignite.internal.storage.configurations.StorageProfileConfiguration;
 import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
 import org.apache.ignite.internal.storage.lease.LeaseInfo;
-import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryProfileConfiguration;
 import 
org.apache.ignite.internal.storage.pagememory.mv.PersistentPageMemoryMvPartitionStorage;
 import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
@@ -108,8 +97,6 @@ public class PersistentPageMemoryMvTableStorageTest extends 
AbstractMvTableStora
     @InjectExecutorService
     private ExecutorService executorService;
 
-    private TestMetricManager metricManager;
-
     @WorkDirectory
     private Path workDir;
 
@@ -119,11 +106,9 @@ public class PersistentPageMemoryMvTableStorageTest 
extends AbstractMvTableStora
 
         ioRegistry.loadFromServiceLoader();
 
-        metricManager = new TestMetricManager();
-
         engine = new PersistentPageMemoryStorageEngine(
                 "test",
-                metricManager,
+                new NoOpMetricManager(),
                 storageConfig,
                 systemConfig,
                 ioRegistry,
@@ -137,8 +122,6 @@ public class PersistentPageMemoryMvTableStorageTest extends 
AbstractMvTableStora
 
         engine.start();
 
-        assertThat(metricManager.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
-
         initialize();
     }
 
@@ -147,10 +130,7 @@ public class PersistentPageMemoryMvTableStorageTest 
extends AbstractMvTableStora
     protected void tearDown() throws Exception {
         super.tearDown();
 
-        IgniteUtils.closeAllManually(
-                () -> assertThat(metricManager.stopAsync(new 
ComponentContext()), willCompleteSuccessfully()),
-                engine == null ? null : engine::stop
-        );
+        IgniteUtils.closeAllManually(engine == null ? null : engine::stop);
     }
 
     @Override
@@ -199,96 +179,10 @@ public class PersistentPageMemoryMvTableStorageTest 
extends AbstractMvTableStora
         }
     }
 
-    @Test
-    void testMaxSizeMetric() {
-        LongMetric metric = (LongMetric) 
metricManager.metric(defaultProfileMetricSourceName(), "MaxSize");
-
-        assertNotNull(metric);
-        assertEquals(defaultProfileConfig().sizeBytes().value(), 
metric.value());
-    }
-
-    @Test
-    void testMaxSizeMetricAfterChangeConfig() {
-        PersistentPageMemoryProfileConfiguration defaultProfileConfig = 
defaultProfileConfig();
-
-        Long sizeBytesBeforeChange = defaultProfileConfig.sizeBytes().value();
-        assertThat(defaultProfileConfig.sizeBytes().update(2 * 
sizeBytesBeforeChange), willCompleteSuccessfully());
-
-        LongMetric metric = (LongMetric) 
metricManager.metric(defaultProfileMetricSourceName(), "MaxSize");
-
-        assertNotNull(metric);
-        assertEquals(sizeBytesBeforeChange, metric.value());
-    }
-
-    @Test
-    void testTotalAllocatedSize() {
-        LongMetric metric = (LongMetric) 
metricManager.metric(defaultProfileMetricSourceName(), "TotalAllocatedSize");
-
-        assertNotNull(metric);
-        assertEquals(0L, metric.value());
-
-        PersistentPageMemoryMvPartitionStorage mvPartitionStorage = 
getOrCreateMvPartition(PARTITION_ID);
-        assertThat(metric.value(), allOf(greaterThan(0L), 
equalTo(totalAllocatedSizeInBytes(PARTITION_ID))));
-
-        addWriteCommitted(mvPartitionStorage);
-        assertThat(metric.value(), allOf(greaterThan(0L), 
equalTo(totalAllocatedSizeInBytes(PARTITION_ID))));
-    }
-
-    @Test
-    void testTotalUsedSize() {
-        LongMetric metric = (LongMetric) 
metricManager.metric(defaultProfileMetricSourceName(), "TotalUsedSize");
-
-        assertNotNull(metric);
-        assertEquals(0L, metric.value());
-
-        PersistentPageMemoryMvPartitionStorage mvPartitionStorage = 
getOrCreateMvPartition(PARTITION_ID);
-        assertThat(metric.value(), allOf(greaterThan(0L), 
equalTo(totalUsedSizeInBytes(PARTITION_ID))));
-
-        addWriteCommitted(mvPartitionStorage);
-        assertThat(metric.value(), allOf(greaterThan(0L), 
equalTo(totalUsedSizeInBytes(PARTITION_ID))));
-    }
-
-    private PersistentPageMemoryProfileConfiguration defaultProfileConfig() {
-        StorageProfileConfiguration config = 
storageConfig.profiles().get(DEFAULT_STORAGE_PROFILE);
-
-        assertNotNull(config);
-        assertInstanceOf(PersistentPageMemoryProfileConfiguration.class, 
config);
-
-        return (PersistentPageMemoryProfileConfiguration) config;
-    }
-
-    private String defaultProfileMetricSourceName() {
-        return "storage." + ENGINE_NAME + "." + 
defaultProfileConfig().name().value();
-    }
-
     private long pageSize() {
         return engine.configuration().pageSizeBytes().value();
     }
 
-    private long filePageStorePageCount(int partitionId) {
-        PersistentPageMemoryTableStorage tableStorage = 
(PersistentPageMemoryTableStorage) this.tableStorage;
-
-        FilePageStore store = 
tableStorage.dataRegion().filePageStoreManager().getStore(new 
GroupPartitionId(TABLE_ID, partitionId));
-
-        return store == null ? 0 : store.pages();
-    }
-
-    private long freeListEmptyPageCount(int partitionId) {
-        PersistentPageMemoryTableStorage tableStorage = 
(PersistentPageMemoryTableStorage) this.tableStorage;
-
-        PersistentPageMemoryMvPartitionStorage storage = 
(PersistentPageMemoryMvPartitionStorage) 
tableStorage.getMvPartition(partitionId);
-
-        return storage == null ? 0L : storage.emptyDataPageCountInFreeList();
-    }
-
-    private long totalAllocatedSizeInBytes(int partitionId) {
-        return pageSize() * filePageStorePageCount(partitionId);
-    }
-
-    private long totalUsedSizeInBytes(int partitionId) {
-        return pageSize() * (filePageStorePageCount(partitionId) - 
freeListEmptyPageCount(partitionId));
-    }
-
     private void addWriteCommitted(PersistentPageMemoryMvPartitionStorage... 
storages) {
         assertThat(storages, not(emptyArray()));
 
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java
index d658cbd2fe7..f5fbd5ecc38 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java
@@ -17,27 +17,33 @@
 
 package org.apache.ignite.internal.storage.pagememory.engine;
 
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.StreamSupport.stream;
 import static 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfigurationSchema.DEFAULT_PAGE_SIZE;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.mockito.Mockito.mock;
 
 import java.nio.file.Path;
-import java.util.Iterator;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.configuration.SystemLocalConfiguration;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.metrics.DoubleMetric;
 import org.apache.ignite.internal.metrics.LongMetric;
 import org.apache.ignite.internal.metrics.Metric;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
 import org.apache.ignite.internal.storage.configurations.StorageProfileView;
 import 
org.apache.ignite.internal.storage.engine.AbstractPersistentStorageEngineTest;
@@ -46,6 +52,7 @@ import 
org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageEngine;
 import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
 import 
org.apache.ignite.internal.storage.metrics.StorageEngineTablesMetricSource;
+import 
org.apache.ignite.internal.storage.metrics.StorageEngineTablesMetricSource.Holder;
 import 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryProfileView;
 import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
@@ -133,26 +140,69 @@ public class PersistentPageMemoryStorageEngineTest 
extends AbstractPersistentSto
         storageEngine.addTableMetrics(tableDescriptor, metricSource);
 
         metricSource.enable();
-        Iterator<Metric> metrics = metricSource.holder().metrics().iterator();
-        assertTrue(metrics.hasNext());
 
-        Metric metric = metrics.next();
-        assertThat(metric.name(), is("TotalAllocatedSize"));
-        assertThat(metric, is(instanceOf(LongMetric.class)));
+        Holder holder = metricSource.holder();
 
-        assertFalse(metrics.hasNext());
+        assertThat(holder, is(notNullValue()));
 
-        LongMetric totalAllocatedSize = (LongMetric) metric;
-        assertEquals(0, totalAllocatedSize.value());
+        Map<String, Metric> metricByName = 
stream(holder.metrics().spliterator(), false)
+                .collect(toMap(Metric::name, Function.identity()));
+
+        LongMetric totalAllocatedSize = (LongMetric) 
metricByName.get("TotalAllocatedSize");
+        LongMetric totalUsedSize = (LongMetric) 
metricByName.get("TotalUsedSize");
+        LongMetric totalEmptySize = (LongMetric) 
metricByName.get("TotalEmptySize");
+        LongMetric totalDataSize = (LongMetric) 
metricByName.get("TotalDataSize");
+        DoubleMetric pagesFillFactor = (DoubleMetric) 
metricByName.get("PagesFillFactor");
+
+        assertThat(totalAllocatedSize.value(), is(0L));
+        assertThat(totalUsedSize.value(), is(0L));
+        assertThat(totalEmptySize.value(), is(0L));
+        assertThat(totalDataSize.value(), is(0L));
+        assertThat(pagesFillFactor.value(), is(0.0));
 
         var otherTableDescriptor = new StorageTableDescriptor(20, 1, 
CatalogService.DEFAULT_STORAGE_PROFILE);
         MvTableStorage otherTable = engine.createMvTable(otherTableDescriptor, 
indexId -> null);
 
-        otherTable.createMvPartition(0);
-        assertEquals(0, totalAllocatedSize.value());
+        assertThat(otherTable.createMvPartition(0), 
willCompleteSuccessfully());
+
+        assertThat(totalAllocatedSize.value(), is(0L));
+        assertThat(totalUsedSize.value(), is(0L));
+        assertThat(totalEmptySize.value(), is(0L));
+        assertThat(totalDataSize.value(), is(0L));
+        assertThat(pagesFillFactor.value(), is(0.0));
+
+        assertThat(table.createMvPartition(0), willCompleteSuccessfully());
 
-        table.createMvPartition(0);
         assertThat(totalAllocatedSize.value(), is(greaterThan(0L)));
-        assertEquals(0, totalAllocatedSize.value() % DEFAULT_PAGE_SIZE);
+        assertThat(totalAllocatedSize.value() % DEFAULT_PAGE_SIZE, is(0L));
+        assertThat(totalUsedSize.value(), is(totalAllocatedSize.value()));
+        assertThat(totalEmptySize.value(), is(0L));
+        assertThat(totalDataSize.value(), is(totalAllocatedSize.value()));
+        assertThat(pagesFillFactor.value(), is(1.0));
+
+        long totalAllocatedSizeBeforeData = totalAllocatedSize.value();
+
+        MvPartitionStorage partitionStorage = table.getMvPartition(0);
+
+        assertThat(partitionStorage, is(notNullValue()));
+
+        partitionStorage.runConsistently(locker -> {
+            RowId rowId = RowId.lowestRowId(0);
+
+            locker.lock(rowId);
+
+            var row = binaryRow(new TestKey(0, "foo"), new TestValue(1, 
"bar"));
+
+            partitionStorage.addWriteCommitted(rowId, row, clock.now());
+
+            return null;
+        });
+
+        assertThat(totalAllocatedSize.value(), 
is(greaterThan(totalAllocatedSizeBeforeData)));
+        assertThat(totalAllocatedSize.value() % DEFAULT_PAGE_SIZE, is(0L));
+        assertThat(totalUsedSize.value(), is(totalAllocatedSize.value()));
+        assertThat(totalEmptySize.value(), is(0L));
+        assertThat(totalDataSize.value(), 
is(lessThan(totalAllocatedSize.value())));
+        assertThat(pagesFillFactor.value(), is(allOf(greaterThan(0.0), 
lessThan(1.0))));
     }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
index 57cd8afee8a..d144d5c037b 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
@@ -33,7 +33,7 @@ import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.lowwatermark.LowWatermark;
-import org.apache.ignite.internal.metrics.TestMetricManager;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
@@ -167,7 +167,7 @@ public class ItLockTableTest extends IgniteAbstractTest {
                         transactionInflights,
                         lowWatermark,
                         commonExecutor,
-                        new TestMetricManager()
+                        new NoOpMetricManager()
                 );
             }
         };
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index f332642e032..f4af6166011 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -42,7 +42,7 @@ import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lowwatermark.LowWatermark;
-import org.apache.ignite.internal.metrics.TestMetricManager;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.InternalClusterNode;
@@ -157,7 +157,7 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends TxAbstractTes
                         transactionInflights,
                         lowWatermark,
                         commonExecutor,
-                        new TestMetricManager()
+                        new NoOpMetricManager()
                 ) {
                     @Override
                     public Executor writeIntentSwitchExecutor() {

Reply via email to